Skip to content

Commit a78647e

Browse files
authored
Destinations CDK: CatalogParser sets default namespace (#38121)
1 parent c019d32 commit a78647e

File tree

18 files changed

+123
-177
lines changed

18 files changed

+123
-177
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
177178
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
178179
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
179180
| 0.36.6 | 2024-06-05 | [\#39106](https://github.com/airbytehq/airbyte/pull/39106) | Skip write to storage with 0 byte file |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+8-33
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.integrations.destination.async
66

77
import com.google.common.base.Preconditions
8-
import com.google.common.base.Strings
98
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
109
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
1110
import io.airbyte.cdk.integrations.destination.async.buffers.BufferEnqueue
@@ -28,7 +27,6 @@ import java.util.concurrent.ExecutorService
2827
import java.util.concurrent.Executors
2928
import java.util.concurrent.atomic.AtomicLong
3029
import java.util.function.Consumer
31-
import kotlin.jvm.optionals.getOrNull
3230
import org.jetbrains.annotations.VisibleForTesting
3331

3432
private val logger = KotlinLogging.logger {}
@@ -51,7 +49,6 @@ constructor(
5149
onFlush: DestinationFlushFunction,
5250
private val catalog: ConfiguredAirbyteCatalog,
5351
private val bufferManager: BufferManager,
54-
private val defaultNamespace: Optional<String>,
5552
private val flushFailure: FlushFailure = FlushFailure(),
5653
workerPool: ExecutorService = Executors.newFixedThreadPool(5),
5754
private val airbyteMessageDeserializer: AirbyteMessageDeserializer =
@@ -79,28 +76,6 @@ constructor(
7976
private var hasClosed = false
8077
private var hasFailed = false
8178

82-
internal constructor(
83-
outputRecordCollector: Consumer<AirbyteMessage>,
84-
onStart: OnStartFunction,
85-
onClose: OnCloseFunction,
86-
flusher: DestinationFlushFunction,
87-
catalog: ConfiguredAirbyteCatalog,
88-
bufferManager: BufferManager,
89-
flushFailure: FlushFailure,
90-
defaultNamespace: Optional<String>,
91-
) : this(
92-
outputRecordCollector,
93-
onStart,
94-
onClose,
95-
flusher,
96-
catalog,
97-
bufferManager,
98-
defaultNamespace,
99-
flushFailure,
100-
Executors.newFixedThreadPool(5),
101-
AirbyteMessageDeserializer(),
102-
)
103-
10479
@Throws(Exception::class)
10580
override fun start() {
10681
Preconditions.checkState(!hasStarted, "Consumer has already been started.")
@@ -129,9 +104,6 @@ constructor(
129104
message,
130105
)
131106
if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) {
132-
if (Strings.isNullOrEmpty(partialAirbyteMessage.record?.namespace)) {
133-
partialAirbyteMessage.record?.namespace = defaultNamespace.getOrNull()
134-
}
135107
validateRecord(partialAirbyteMessage)
136108

137109
partialAirbyteMessage.record?.streamDescriptor?.let {
@@ -141,7 +113,6 @@ constructor(
141113
bufferEnqueue.addRecord(
142114
partialAirbyteMessage,
143115
sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES,
144-
defaultNamespace,
145116
)
146117
}
147118

@@ -159,10 +130,14 @@ constructor(
159130
bufferManager.close()
160131

161132
val streamSyncSummaries =
162-
streamNames.associateWith { streamDescriptor: StreamDescriptor ->
163-
StreamSyncSummary(
164-
Optional.of(getRecordCounter(streamDescriptor).get()),
165-
)
133+
streamNames.associate { streamDescriptor ->
134+
StreamDescriptorUtils.withDefaultNamespace(
135+
streamDescriptor,
136+
bufferManager.defaultNamespace,
137+
) to
138+
StreamSyncSummary(
139+
Optional.of(getRecordCounter(streamDescriptor).get()),
140+
)
166141
}
167142
onClose.accept(hasFailed, streamSyncSummaries)
168143

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt

+7
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,11 @@ object StreamDescriptorUtils {
3434

3535
return pairs
3636
}
37+
38+
fun withDefaultNamespace(sd: StreamDescriptor, defaultNamespace: String) =
39+
if (sd.namespace.isNullOrEmpty()) {
40+
StreamDescriptor().withName(sd.name).withNamespace(defaultNamespace)
41+
} else {
42+
sd
43+
}
3744
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt

+18-5
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ package io.airbyte.cdk.integrations.destination.async.buffers
77
import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager
88
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
99
import io.airbyte.cdk.integrations.destination.async.state.GlobalAsyncStateManager
10+
import io.airbyte.commons.json.Jsons
1011
import io.airbyte.protocol.models.v0.AirbyteMessage
1112
import io.airbyte.protocol.models.v0.StreamDescriptor
12-
import java.util.Optional
1313
import java.util.concurrent.ConcurrentMap
1414

1515
/**
@@ -20,6 +20,7 @@ class BufferEnqueue(
2020
private val memoryManager: GlobalMemoryManager,
2121
private val buffers: ConcurrentMap<StreamDescriptor, StreamAwareQueue>,
2222
private val stateManager: GlobalAsyncStateManager,
23+
private val defaultNamespace: String,
2324
) {
2425
/**
2526
* Buffer a record. Contains memory management logic to dynamically adjust queue size based via
@@ -31,12 +32,11 @@ class BufferEnqueue(
3132
fun addRecord(
3233
message: PartialAirbyteMessage,
3334
sizeInBytes: Int,
34-
defaultNamespace: Optional<String>,
3535
) {
3636
if (message.type == AirbyteMessage.Type.RECORD) {
3737
handleRecord(message, sizeInBytes)
3838
} else if (message.type == AirbyteMessage.Type.STATE) {
39-
stateManager.trackState(message, sizeInBytes.toLong(), defaultNamespace.orElse(""))
39+
stateManager.trackState(message, sizeInBytes.toLong())
4040
}
4141
}
4242

@@ -53,15 +53,28 @@ class BufferEnqueue(
5353
}
5454
val stateId = stateManager.getStateIdAndIncrementCounter(streamDescriptor)
5555

56-
var addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId)
56+
// We don't set the default namespace until after putting this message into the state
57+
// manager/etc.
58+
// All our internal handling is on the true (null) namespace,
59+
// we just set the default namespace when handing off to destination-specific code.
60+
val mangledMessage =
61+
if (message.record!!.namespace.isNullOrEmpty()) {
62+
val clone = Jsons.clone(message)
63+
clone.record!!.namespace = defaultNamespace
64+
clone
65+
} else {
66+
message
67+
}
68+
69+
var addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId)
5770

5871
var i = 0
5972
while (!addedToQueue) {
6073
val newlyAllocatedMemory = memoryManager.requestMemory()
6174
if (newlyAllocatedMemory > 0) {
6275
queue.addMaxMemory(newlyAllocatedMemory)
6376
}
64-
addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId)
77+
addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId)
6578
i++
6679
if (i > 5) {
6780
try {

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ private val logger = KotlinLogging.logger {}
2222
class BufferManager
2323
@JvmOverloads
2424
constructor(
25+
/**
26+
* This probably doesn't belong here, but it's the easiest place where both [BufferEnqueue] and
27+
* [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer] can both get to it.
28+
*/
29+
public val defaultNamespace: String,
2530
maxMemory: Long = (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO).toLong(),
2631
) {
2732
@get:VisibleForTesting val buffers: ConcurrentMap<StreamDescriptor, StreamAwareQueue>
@@ -46,7 +51,7 @@ constructor(
4651
memoryManager = GlobalMemoryManager(maxMemory)
4752
this.stateManager = GlobalAsyncStateManager(memoryManager)
4853
buffers = ConcurrentHashMap()
49-
bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager)
54+
bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager, defaultNamespace)
5055
bufferDequeue = BufferDequeue(memoryManager, buffers, stateManager)
5156
debugLoop = Executors.newSingleThreadScheduledExecutor()
5257
debugLoop.scheduleAtFixedRate(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt

+2-29
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.integrations.destination.async.state
66

77
import com.google.common.base.Preconditions
8-
import com.google.common.base.Strings
98
import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager
109
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
1110
import io.airbyte.commons.json.Jsons
@@ -104,7 +103,6 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
104103
fun trackState(
105104
message: PartialAirbyteMessage,
106105
sizeInBytes: Long,
107-
defaultNamespace: String,
108106
) {
109107
if (preState) {
110108
convertToGlobalIfNeeded(message)
@@ -113,7 +111,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
113111
// stateType should not change after a conversion.
114112
Preconditions.checkArgument(stateType == extractStateType(message))
115113

116-
closeState(message, sizeInBytes, defaultNamespace)
114+
closeState(message, sizeInBytes)
117115
}
118116

119117
/**
@@ -323,10 +321,9 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
323321
private fun closeState(
324322
message: PartialAirbyteMessage,
325323
sizeInBytes: Long,
326-
defaultNamespace: String,
327324
) {
328325
val resolvedDescriptor: StreamDescriptor =
329-
extractStream(message, defaultNamespace)
326+
extractStream(message)
330327
.orElse(
331328
SENTINEL_GLOBAL_DESC,
332329
)
@@ -424,38 +421,14 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
424421
UUID.randomUUID().toString(),
425422
)
426423

427-
/**
428-
* If the user has selected the Destination Namespace as the Destination default while
429-
* setting up the connector, the platform sets the namespace as null in the StreamDescriptor
430-
* in the AirbyteMessages (both record and state messages). The destination checks that if
431-
* the namespace is empty or null, if yes then re-populates it with the defaultNamespace.
432-
* See [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept] But
433-
* destination only does this for the record messages. So when state messages arrive without
434-
* a namespace and since the destination doesn't repopulate it with the default namespace,
435-
* there is a mismatch between the StreamDescriptor from record messages and state messages.
436-
* That breaks the logic of the state management class as [descToStateIdQ] needs to have
437-
* consistent StreamDescriptor. This is why while trying to extract the StreamDescriptor
438-
* from state messages, we check if the namespace is null, if yes then replace it with
439-
* defaultNamespace to keep it consistent with the record messages.
440-
*/
441424
private fun extractStream(
442425
message: PartialAirbyteMessage,
443-
defaultNamespace: String,
444426
): Optional<StreamDescriptor> {
445427
if (
446428
message.state?.type != null &&
447429
message.state?.type == AirbyteStateMessage.AirbyteStateType.STREAM
448430
) {
449431
val streamDescriptor: StreamDescriptor? = message.state?.stream?.streamDescriptor
450-
if (Strings.isNullOrEmpty(streamDescriptor?.namespace)) {
451-
return Optional.of(
452-
StreamDescriptor()
453-
.withName(
454-
streamDescriptor?.name,
455-
)
456-
.withNamespace(defaultNamespace),
457-
)
458-
}
459432
return streamDescriptor?.let { Optional.of(it) } ?: Optional.empty()
460433
}
461434
return Optional.empty()

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt

-24
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.36.8
1+
version=0.37.0

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
3131
import java.io.IOException
3232
import java.math.BigDecimal
3333
import java.time.Instant
34-
import java.util.Optional
3534
import java.util.concurrent.Executors
3635
import java.util.concurrent.TimeUnit
3736
import java.util.concurrent.TimeoutException
@@ -60,7 +59,7 @@ class AsyncStreamConsumerTest {
6059
private val CATALOG: ConfiguredAirbyteCatalog =
6160
ConfiguredAirbyteCatalog()
6261
.withStreams(
63-
java.util.List.of(
62+
listOf(
6463
CatalogHelpers.createConfiguredAirbyteStream(
6564
STREAM_NAME,
6665
SCHEMA_NAME,
@@ -145,9 +144,8 @@ class AsyncStreamConsumerTest {
145144
onClose = onClose,
146145
onFlush = flushFunction,
147146
catalog = CATALOG,
148-
bufferManager = BufferManager(),
147+
bufferManager = BufferManager("default_ns"),
149148
flushFailure = flushFailure,
150-
defaultNamespace = Optional.of("default_ns"),
151149
airbyteMessageDeserializer = airbyteMessageDeserializer,
152150
workerPool = Executors.newFixedThreadPool(5),
153151
)
@@ -264,9 +262,8 @@ class AsyncStreamConsumerTest {
264262
Mockito.mock(OnCloseFunction::class.java),
265263
flushFunction,
266264
CATALOG,
267-
BufferManager((1024 * 10).toLong()),
265+
BufferManager("default_ns", (1024 * 10).toLong()),
268266
flushFailure,
269-
Optional.of("default_ns"),
270267
)
271268
Mockito.`when`(flushFunction.optimalBatchSizeBytes).thenReturn(0L)
272269

0 commit comments

Comments
 (0)