Skip to content

Commit 8957119

Browse files
destination-s3: add file transfer (#46302)
Co-authored-by: benmoriceau <[email protected]>
1 parent ed03d4e commit 8957119

File tree

29 files changed

+775
-57
lines changed

29 files changed

+775
-57
lines changed

airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
88
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
99
import io.airbyte.cdk.load.test.util.NoopNameMapper
1010
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
11+
import org.junit.jupiter.api.Disabled
1112
import org.junit.jupiter.api.Test
1213

1314
class MockBasicFunctionalityIntegrationTest :
@@ -27,6 +28,7 @@ class MockBasicFunctionalityIntegrationTest :
2728
}
2829

2930
@Test
31+
@Disabled
3032
override fun testMidSyncCheckpointingStreamState() {
3133
super.testMidSyncCheckpointingStreamState()
3234
}

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.48.0 | 2024-10-23 | [\#46302](https://github.com/airbytehq/airbyte/pull/46302) | Add support for file transfer |
177178
| 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest|
178179
| 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty|
179180
| 0.47.1 | 2024-09-27 | [\#45397](https://github.com/airbytehq/airbyte/pull/45397) | Allow logical replication from Postgres 16 read-replicas|

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ constructor(
5454
workerPool: ExecutorService = Executors.newFixedThreadPool(5),
5555
private val airbyteMessageDeserializer: AirbyteMessageDeserializer =
5656
AirbyteMessageDeserializer(),
57+
flushOnEveryMessage: Boolean = false,
5758
) : SerializedAirbyteMessageConsumer {
5859
private val bufferEnqueue: BufferEnqueue = bufferManager.bufferEnqueue
5960
private val flushWorkers: FlushWorkers =
@@ -64,6 +65,7 @@ constructor(
6465
flushFailure,
6566
bufferManager.stateManager,
6667
workerPool,
68+
flushOnEveryMessage,
6769
)
6870
private val streamNames: Set<StreamDescriptor> =
6971
StreamDescriptorUtils.fromConfiguredCatalog(

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt

+13-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ internal constructor(
2828
private val isClosing: AtomicBoolean,
2929
private val flusher: DestinationFlushFunction,
3030
private val nowProvider: Clock,
31+
private val flushOnEveryMessage: Boolean = false,
3132
) {
3233
private val latestFlushTimeMsPerStream: ConcurrentMap<StreamDescriptor, Long> =
3334
ConcurrentHashMap()
@@ -37,7 +38,15 @@ internal constructor(
3738
runningFlushWorkers: RunningFlushWorkers,
3839
isClosing: AtomicBoolean,
3940
flusher: DestinationFlushFunction,
40-
) : this(bufferDequeue, runningFlushWorkers, isClosing, flusher, Clock.systemUTC())
41+
flushOnEveryMessage: Boolean = false,
42+
) : this(
43+
bufferDequeue,
44+
runningFlushWorkers,
45+
isClosing,
46+
flusher,
47+
Clock.systemUTC(),
48+
flushOnEveryMessage
49+
)
4150

4251
val nextStreamToFlush: Optional<StreamDescriptor>
4352
/**
@@ -70,7 +79,8 @@ internal constructor(
7079
bufferDequeue.totalGlobalQueueSizeBytes.toDouble() / bufferDequeue.maxQueueSizeBytes
7180
// when we are closing or queues are very full, flush regardless of how few items are in the
7281
// queue.
73-
return if (isClosing.get() || isBuffer90Full) 0 else flusher.queueFlushThresholdBytes
82+
return if (flushOnEveryMessage || isClosing.get() || isBuffer90Full) 0
83+
else flusher.queueFlushThresholdBytes
7484
}
7585

7686
// todo (cgardens) - improve prioritization by getting a better estimate of how much data
@@ -105,7 +115,7 @@ internal constructor(
105115
"${isTimeTriggeredResult.second} , ${isSizeTriggeredResult.second}"
106116
logger.debug { "computed: $debugString" }
107117

108-
if (isSizeTriggeredResult.first || isTimeTriggeredResult.first) {
118+
if (flushOnEveryMessage || isSizeTriggeredResult.first || isTimeTriggeredResult.first) {
109119
logger.info { "flushing: $debugString" }
110120
latestFlushTimeMsPerStream[stream] = nowProvider.millis()
111121
return Optional.of(stream)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/FlushWorkers.kt

+2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ constructor(
5151
private val flushFailure: FlushFailure,
5252
private val stateManager: GlobalAsyncStateManager,
5353
private val workerPool: ExecutorService = Executors.newFixedThreadPool(5),
54+
flushOnEveryMessage: Boolean = false,
5455
) : AutoCloseable {
5556
private val supervisorThread: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
5657
private val debugLoop: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
@@ -66,6 +67,7 @@ constructor(
6667
runningFlushWorkers,
6768
isClosing,
6869
flusher,
70+
flushOnEveryMessage,
6971
)
7072
}
7173

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class BufferDequeue(
6363

6464
// otherwise pull records until we hit the memory limit.
6565
val newSize: Long = (memoryItem.size) + bytesRead.get()
66-
if (newSize <= optimalBytesToRead) {
66+
if (newSize <= optimalBytesToRead || output.isEmpty()) {
6767
memoryItem.size.let { bytesRead.addAndGet(it) }
6868
queue.poll()?.item?.let { output.add(it) }
6969
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.destination.async.model
6+
7+
import com.fasterxml.jackson.annotation.JsonProperty
8+
9+
class AirbyteRecordMessageFile {
10+
constructor(
11+
fileUrl: String? = null,
12+
bytes: Long? = null,
13+
fileRelativePath: String? = null,
14+
modified: Long? = null,
15+
sourceFileUrl: String? = null
16+
) {
17+
this.fileUrl = fileUrl
18+
this.bytes = bytes
19+
this.fileRelativePath = fileRelativePath
20+
this.modified = modified
21+
this.sourceFileUrl = sourceFileUrl
22+
}
23+
constructor() :
24+
this(
25+
fileUrl = null,
26+
bytes = null,
27+
fileRelativePath = null,
28+
modified = null,
29+
sourceFileUrl = null
30+
)
31+
32+
@get:JsonProperty("file_url")
33+
@set:JsonProperty("file_url")
34+
@JsonProperty("file_url")
35+
var fileUrl: String? = null
36+
37+
@get:JsonProperty("bytes")
38+
@set:JsonProperty("bytes")
39+
@JsonProperty("bytes")
40+
var bytes: Long? = null
41+
42+
@get:JsonProperty("file_relative_path")
43+
@set:JsonProperty("file_relative_path")
44+
@JsonProperty("file_relative_path")
45+
var fileRelativePath: String? = null
46+
47+
@get:JsonProperty("modified")
48+
@set:JsonProperty("modified")
49+
@JsonProperty("modified")
50+
var modified: Long? = null
51+
52+
@get:JsonProperty("source_file_url")
53+
@set:JsonProperty("source_file_url")
54+
@JsonProperty("source_file_url")
55+
var sourceFileUrl: String? = null
56+
}

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt

+15-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.integrations.destination.async.model
66

77
import com.fasterxml.jackson.annotation.JsonProperty
8-
import com.fasterxml.jackson.annotation.JsonPropertyDescription
98
import com.fasterxml.jackson.databind.JsonNode
109
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
1110
import io.airbyte.protocol.models.v0.StreamDescriptor
@@ -33,14 +32,18 @@ class PartialAirbyteRecordMessage {
3332
@get:JsonProperty("emitted_at")
3433
@set:JsonProperty("emitted_at")
3534
@JsonProperty("emitted_at")
36-
@JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.")
3735
var emittedAt: Long = 0
3836

3937
@get:JsonProperty("meta")
4038
@set:JsonProperty("meta")
4139
@JsonProperty("meta")
4240
var meta: AirbyteRecordMessageMeta? = null
4341

42+
@get:JsonProperty("file")
43+
@set:JsonProperty("file")
44+
@JsonProperty("file")
45+
var file: AirbyteRecordMessageFile? = null
46+
4447
fun withNamespace(namespace: String?): PartialAirbyteRecordMessage {
4548
this.namespace = namespace
4649
return this
@@ -66,6 +69,11 @@ class PartialAirbyteRecordMessage {
6669
return this
6770
}
6871

72+
fun withFile(file: AirbyteRecordMessageFile): PartialAirbyteRecordMessage {
73+
this.file = file
74+
return this
75+
}
76+
6977
override fun equals(other: Any?): Boolean {
7078
if (this === other) {
7179
return true
@@ -77,7 +85,8 @@ class PartialAirbyteRecordMessage {
7785
return namespace == that.namespace &&
7886
stream == that.stream &&
7987
emittedAt == that.emittedAt &&
80-
meta == that.meta
88+
meta == that.meta &&
89+
file == that.file
8190
}
8291

8392
override fun hashCode(): Int {
@@ -98,6 +107,9 @@ class PartialAirbyteRecordMessage {
98107
", meta='" +
99108
meta +
100109
'\'' +
110+
", file='" +
111+
file +
112+
'\'' +
101113
'}'
102114
}
103115

Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.47.3
1+
version=0.48.1

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,9 @@ class AsyncStreamConsumerTest {
765765
val throwable =
766766
assertThrows(RuntimeException::class.java) { consumer.accept(retyped, retyped.length) }
767767
// Ensure that the offending data has been scrubbed from the error message
768-
assertFalse(throwable.message!!.contains(offender))
768+
assertFalse(
769+
throwable.message!!.contains(offender),
770+
"message should not contain the offender. Was ${throwable.message}"
771+
)
769772
}
770773
}

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlushTest.kt

+47-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,13 @@ class DetectStreamToFlushTest {
4949
)
5050

5151
val detect =
52-
DetectStreamToFlush(bufferDequeue, runningFlushWorkers, AtomicBoolean(false), flusher)
52+
DetectStreamToFlush(
53+
bufferDequeue,
54+
runningFlushWorkers,
55+
AtomicBoolean(false),
56+
flusher,
57+
flushOnEveryMessage = false
58+
)
5359
Assertions.assertEquals(Optional.empty<Any>(), detect.getNextStreamToFlush(0))
5460
}
5561

@@ -66,7 +72,13 @@ class DetectStreamToFlushTest {
6672
RunningFlushWorkers::class.java,
6773
)
6874
val detect =
69-
DetectStreamToFlush(bufferDequeue, runningFlushWorkers, AtomicBoolean(false), flusher)
75+
DetectStreamToFlush(
76+
bufferDequeue,
77+
runningFlushWorkers,
78+
AtomicBoolean(false),
79+
flusher,
80+
flushOnEveryMessage = false
81+
)
7082
// if above threshold, triggers
7183
Assertions.assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0))
7284
// if below threshold, no trigger
@@ -94,10 +106,41 @@ class DetectStreamToFlushTest {
94106
),
95107
)
96108
val detect =
97-
DetectStreamToFlush(bufferDequeue, runningFlushWorkers, AtomicBoolean(false), flusher)
109+
DetectStreamToFlush(
110+
bufferDequeue,
111+
runningFlushWorkers,
112+
AtomicBoolean(false),
113+
flusher,
114+
flushOnEveryMessage = false
115+
)
98116
Assertions.assertEquals(Optional.empty<Any>(), detect.getNextStreamToFlush(0))
99117
}
100118

119+
@Test
120+
internal fun testFileTransfer() {
121+
val bufferDequeue =
122+
Mockito.mock(
123+
BufferDequeue::class.java,
124+
)
125+
Mockito.`when`(bufferDequeue.bufferedStreams).thenReturn(setOf(DESC1))
126+
Mockito.`when`(bufferDequeue.getQueueSizeBytes(DESC1)).thenReturn(Optional.of(0L))
127+
val runningFlushWorkers =
128+
Mockito.mock(
129+
RunningFlushWorkers::class.java,
130+
)
131+
132+
val detect =
133+
DetectStreamToFlush(
134+
bufferDequeue,
135+
runningFlushWorkers,
136+
AtomicBoolean(false),
137+
flusher,
138+
flushOnEveryMessage = true
139+
)
140+
Assertions.assertEquals(0, detect.computeQueueThreshold())
141+
Assertions.assertEquals(Optional.of(DESC1), detect.getNextStreamToFlush(0))
142+
}
143+
101144
@Test
102145
internal fun testGetNextPicksUpOnTimeTrigger() {
103146
val bufferDequeue =
@@ -127,6 +170,7 @@ class DetectStreamToFlushTest {
127170
AtomicBoolean(false),
128171
flusher,
129172
mockedNowProvider,
173+
flushOnEveryMessage = false
130174
)
131175

132176
// initialize flush time

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import io.airbyte.configoss.WorkerDestinationConfig
1414
import io.airbyte.protocol.models.v0.AirbyteMessage
1515
import io.airbyte.protocol.models.v0.AirbyteStateStats
1616
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
17+
import io.airbyte.workers.exception.TestHarnessException
1718
import io.airbyte.workers.helper.ConnectorConfigUpdater
1819
import io.airbyte.workers.internal.AirbyteDestination
1920
import io.airbyte.workers.internal.DefaultAirbyteDestination
@@ -215,7 +216,11 @@ abstract class BaseDestinationAcceptanceTest(
215216
}
216217
}
217218

218-
destination.close()
219+
try {
220+
destination.close()
221+
} catch (e: TestHarnessException) {
222+
throw TestHarnessException(e.message, e, destinationOutput)
223+
}
219224

220225
return destinationOutput
221226
}
@@ -258,6 +263,7 @@ abstract class BaseDestinationAcceptanceTest(
258263
workspaceRoot,
259264
workspaceRoot.toString(),
260265
localRoot.toString(),
266+
fileTransferMountSource,
261267
"host",
262268
getConnectorEnv()
263269
)

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest {
117117
workspaceRoot,
118118
workspaceRoot.toString(),
119119
localRoot.toString(),
120+
fileTransferMountSource = null,
120121
"host",
121122
envMap
122123
)

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt

+14
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.airbyte.commons.features
55

66
import io.github.oshai.kotlinlogging.KotlinLogging
7+
import java.nio.file.Path
78
import java.util.function.Function
89

910
private val log = KotlinLogging.logger {}
@@ -46,6 +47,16 @@ class EnvVariableFeatureFlags : FeatureFlags {
4647
return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg }
4748
}
4849

50+
override fun airbyteStagingDirectory(): Path? {
51+
return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, null) { arg: String ->
52+
Path.of(arg)
53+
}
54+
}
55+
56+
override fun useFileTransfer(): Boolean {
57+
return getEnvOrDefault(USE_FILE_TRANSFER, false) { it.toBoolean() }
58+
}
59+
4960
// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
5061
fun <T> getEnvOrDefault(key: String?, defaultValue: T, parser: Function<String, T>): T {
5162
val value = System.getenv(key)
@@ -73,5 +84,8 @@ class EnvVariableFeatureFlags : FeatureFlags {
7384
const val STRICT_COMPARISON_NORMALIZATION_TAG: String =
7485
"STRICT_COMPARISON_NORMALIZATION_TAG"
7586
const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE"
87+
val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files")
88+
const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY"
89+
const val USE_FILE_TRANSFER = "USE_FILE_TRANSFER"
7690
}
7791
}

0 commit comments

Comments
 (0)