Skip to content

Commit e9a37d4

Browse files
S3 Dest emits V2 fields and captures failures (#42409)
1 parent ba88195 commit e9a37d4

File tree

85 files changed

+5055
-725
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+5055
-725
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+288-287
Large diffs are not rendered by default.

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ constructor(
2626
* This probably doesn't belong here, but it's the easiest place where both [BufferEnqueue] and
2727
* [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer] can both get to it.
2828
*/
29-
public val defaultNamespace: String?,
29+
val defaultNamespace: String?,
3030
maxMemory: Long = (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO).toLong(),
3131
) {
3232
@get:VisibleForTesting val buffers: ConcurrentMap<StreamDescriptor, StreamAwareQueue>

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
4242
*/
4343
@Deprecated("")
4444
@Throws(IOException::class)
45-
protected abstract fun writeRecord(record: AirbyteRecordMessage)
45+
protected abstract fun writeRecord(
46+
record: AirbyteRecordMessage,
47+
generationId: Long = 0,
48+
syncId: Long = 0
49+
)
4650

4751
/**
4852
* TODO: (ryankfu) move destination to use serialized record string instead of passing entire
@@ -80,7 +84,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
8084

8185
@Deprecated("")
8286
@Throws(Exception::class)
83-
override fun accept(record: AirbyteRecordMessage): Long {
87+
override fun accept(record: AirbyteRecordMessage, generationId: Long, syncId: Long): Long {
8488
if (!isStarted) {
8589
if (useCompression) {
8690
compressedBuffer = GzipCompressorOutputStream(byteCounter)
@@ -92,7 +96,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
9296
}
9397
if (inputStream == null && !isClosed) {
9498
val startCount = byteCounter.count
95-
@Suppress("deprecation") writeRecord(record)
99+
@Suppress("deprecation") writeRecord(record, generationId, syncId)
96100
return byteCounter.count - startCount
97101
} else {
98102
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BufferingStrategy.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ interface BufferingStrategy : AutoCloseable {
2929
@Throws(Exception::class)
3030
fun addRecord(
3131
stream: AirbyteStreamNameNamespacePair,
32-
message: AirbyteMessage
32+
message: AirbyteMessage,
33+
generationId: Long = 0,
34+
syncId: Long = 0
3335
): Optional<BufferFlushType>
3436

3537
/** Flush the buffered messages from a single stream */

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ class InMemoryRecordBufferingStrategy(
4141
@Throws(Exception::class)
4242
override fun addRecord(
4343
stream: AirbyteStreamNameNamespacePair,
44-
message: AirbyteMessage
44+
message: AirbyteMessage,
45+
generationId: Long,
46+
syncId: Long
4547
): Optional<BufferFlushType> {
4648
var flushed: Optional<BufferFlushType> = Optional.empty()
4749

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ interface SerializableBuffer : AutoCloseable {
2424
* @param record [AirbyteRecordMessage] to be added to buffer
2525
* @return number of bytes written to the buffer
2626
*/
27-
@Deprecated("") @Throws(Exception::class) fun accept(record: AirbyteRecordMessage): Long
27+
@Deprecated("")
28+
@Throws(Exception::class)
29+
fun accept(record: AirbyteRecordMessage, generationId: Long = 0, syncId: Long = 0): Long
2830

2931
/**
3032
* TODO: (ryankfu) Move all destination connectors to pass the serialized record string instead

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategy.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,15 @@ class SerializedBufferingStrategy
4747
override fun addRecord(
4848
stream: AirbyteStreamNameNamespacePair,
4949
message: AirbyteMessage,
50+
generationId: Long,
51+
syncId: Long
5052
): Optional<BufferFlushType> {
5153
var flushed: Optional<BufferFlushType> = Optional.empty()
5254

5355
val buffer = getOrCreateBuffer(stream)
5456

55-
@Suppress("DEPRECATION") val actualMessageSizeInBytes = buffer.accept(message.record)
57+
@Suppress("DEPRECATION")
58+
val actualMessageSizeInBytes = buffer.accept(message.record, generationId, syncId)
5659
totalBufferSizeInBytes += actualMessageSizeInBytes
5760
// Flushes buffer when either the buffer was completely filled or only a single stream was
5861
// filled
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.44.9
1+
version=0.44.12

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test
2424
import org.mockito.ArgumentMatchers
2525
import org.mockito.Mockito
2626
import org.mockito.kotlin.any
27+
import org.mockito.kotlin.eq
2728
import org.mockito.kotlin.mock
2829

2930
class BufferedStreamConsumerTest {
@@ -361,7 +362,7 @@ class BufferedStreamConsumerTest {
361362
// The first two records that we push will not trigger any flushes, but the third record
362363
// _will_
363364
// trigger a flush
364-
Mockito.`when`(strategy.addRecord(any(), any()))
365+
Mockito.`when`(strategy.addRecord(any(), any(), eq(0), eq(0)))
365366
.thenReturn(
366367
Optional.empty(),
367368
Optional.empty(),
@@ -463,7 +464,7 @@ class BufferedStreamConsumerTest {
463464
// The first two records that we push will not trigger any flushes, but the third record
464465
// _will_
465466
// trigger a flush
466-
Mockito.`when`(strategy.addRecord(any(), any()))
467+
Mockito.`when`(strategy.addRecord(any(), any(), eq(0), eq(0)))
467468
.thenReturn(
468469
Optional.empty(),
469470
Optional.empty(),

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategyTest.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.junit.jupiter.api.BeforeEach
1414
import org.junit.jupiter.api.Test
1515
import org.mockito.Mockito
1616
import org.mockito.kotlin.any
17+
import org.mockito.kotlin.eq
1718

1819
class SerializedBufferingStrategyTest {
1920
private val catalog: ConfiguredAirbyteCatalog =
@@ -37,7 +38,7 @@ class SerializedBufferingStrategyTest {
3738

3839
@Throws(Exception::class)
3940
private fun setupMock(mockObject: SerializableBuffer) {
40-
Mockito.`when`(mockObject.accept(any())).thenReturn(10L)
41+
Mockito.`when`(mockObject.accept(any(), eq(0L), eq(0L))).thenReturn(10L)
4142
Mockito.`when`(mockObject.byteCount).thenReturn(10L)
4243
Mockito.`when`(mockObject.maxTotalBufferSizeInBytes).thenReturn(MAX_TOTAL_BUFFER_SIZE_BYTES)
4344
Mockito.`when`(mockObject.maxPerStreamBufferSizeInBytes)

0 commit comments

Comments
 (0)