Skip to content

Commit 0782ea4

Browse files
Destination-S3: Enable optional fallback to legacy java SDK
1 parent 2286605 commit 0782ea4

File tree

6 files changed

+333
-6
lines changed

6 files changed

+333
-6
lines changed

airbyte-cdk/bulk/toolkits/load-s3/build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ dependencies {
44
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-aws')
55
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage')
66

7+
// For legacy AWS Java SDK Support
8+
api 'com.amazonaws:aws-java-sdk-s3:1.12.772'
9+
api 'com.amazonaws:aws-java-sdk-sts:1.12.772'
10+
711
testFixturesApi(testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage")))
812
implementation("aws.sdk.kotlin:s3:1.3.98")
913
implementation("aws.smithy.kotlin:http-client-engine-crt:1.3.31")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.command.s3
6+
7+
/**
8+
* Add this to your destination configuration to provide S3 client configuration.
9+
*
10+
* Currently, this is optional and only serves to enable
11+
* [io.airbyte.cdk.load.file.s3.S3LegacyJavaClient].
12+
*/
13+
interface S3ClientConfigurationProvider {
14+
val s3ClientConfiguration: S3ClientConfiguration
15+
}
16+
17+
data class S3ClientConfiguration(val useLegacyJavaClient: Boolean = false)

airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt

+36-6
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
2424
import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfigurationProvider
2525
import io.airbyte.cdk.load.command.aws.AWSArnRoleConfigurationProvider
2626
import io.airbyte.cdk.load.command.aws.AwsAssumeRoleCredentials
27-
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration
2827
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurationProvider
2928
import io.airbyte.cdk.load.command.s3.S3BucketConfiguration
3029
import io.airbyte.cdk.load.command.s3.S3BucketConfigurationProvider
30+
import io.airbyte.cdk.load.command.s3.S3ClientConfigurationProvider
3131
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
3232
import io.airbyte.cdk.load.file.object_storage.RemoteObject
3333
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
@@ -44,12 +44,18 @@ data class S3Object(override val key: String, override val storageConfig: S3Buck
4444
get() = "${storageConfig.s3BucketName}/$key"
4545
}
4646

47+
interface S3Client : ObjectStorageClient<S3Object>
48+
49+
/**
50+
* The primary and recommended S3 client implementation -- kotlin-friendly with suspend functions.
51+
* However, there's a bug that can cause hard failures under high-concurrency. (Partial workaround
52+
* in place https://github.com/awslabs/aws-sdk-kotlin/issues/1214#issuecomment-2464831817).
53+
*/
4754
@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
48-
class S3Client(
55+
class S3KotlinClient(
4956
private val client: aws.sdk.kotlin.services.s3.S3Client,
5057
val bucketConfig: S3BucketConfiguration,
51-
private val uploadConfig: ObjectStorageUploadConfiguration?,
52-
) : ObjectStorageClient<S3Object> {
58+
) : S3Client {
5359
private val log = KotlinLogging.logger {}
5460

5561
override suspend fun list(prefix: String) = flow {
@@ -159,7 +165,10 @@ class S3ClientFactory(
159165
private val bucketConfig: S3BucketConfigurationProvider,
160166
private val uploadConfig: ObjectStorageUploadConfigurationProvider? = null,
161167
private val assumeRoleCredentials: AwsAssumeRoleCredentials?,
168+
private val s3ClientConfig: S3ClientConfigurationProvider? = null,
162169
) {
170+
private val log = KotlinLogging.logger {}
171+
163172
companion object {
164173
const val AIRBYTE_STS_SESSION_NAME = "airbyte-sts-session"
165174

@@ -174,6 +183,28 @@ class S3ClientFactory(
174183
@Singleton
175184
@Secondary
176185
fun make(): S3Client {
186+
if (s3ClientConfig?.s3ClientConfiguration?.useLegacyJavaClient == true) {
187+
log.info { "Creating S3 client using legacy Java SDK" }
188+
return if (
189+
arnRole.awsArnRoleConfiguration.roleArn != null && assumeRoleCredentials != null
190+
) {
191+
S3LegacyJavaClientFactory()
192+
.createFromAssumeRole(
193+
arnRole.awsArnRoleConfiguration,
194+
assumeRoleCredentials,
195+
bucketConfig.s3BucketConfiguration
196+
)
197+
} else {
198+
S3LegacyJavaClientFactory()
199+
.createFromAccessKey(
200+
keyConfig.awsAccessKeyConfiguration,
201+
bucketConfig.s3BucketConfiguration
202+
)
203+
}
204+
}
205+
206+
log.info { "Creating S3 client using Kotlin SDK" }
207+
177208
val credsProvider: CredentialsProvider =
178209
if (keyConfig.awsAccessKeyConfiguration.accessKeyId != null) {
179210
StaticCredentialsProvider {
@@ -216,10 +247,9 @@ class S3ClientFactory(
216247
httpClient(CrtHttpEngine)
217248
}
218249

219-
return S3Client(
250+
return S3KotlinClient(
220251
s3SdkClient,
221252
bucketConfig.s3BucketConfiguration,
222-
uploadConfig?.objectStorageUploadConfiguration
223253
)
224254
}
225255
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.file.s3
6+
7+
import com.amazonaws.ClientConfiguration
8+
import com.amazonaws.Protocol
9+
import com.amazonaws.auth.AWSCredentials
10+
import com.amazonaws.auth.AWSStaticCredentialsProvider
11+
import com.amazonaws.auth.BasicAWSCredentials
12+
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider
13+
import com.amazonaws.client.builder.AwsClientBuilder
14+
import com.amazonaws.endpointdiscovery.DaemonThreadFactory
15+
import com.amazonaws.regions.Regions
16+
import com.amazonaws.retry.RetryMode
17+
import com.amazonaws.services.s3.AmazonS3
18+
import com.amazonaws.services.s3.AmazonS3ClientBuilder
19+
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest
20+
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest
21+
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult
22+
import com.amazonaws.services.s3.model.ObjectMetadata
23+
import com.amazonaws.services.s3.model.PartETag
24+
import com.amazonaws.services.s3.model.UploadPartRequest
25+
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
26+
import io.airbyte.cdk.load.command.aws.AWSAccessKeyConfiguration
27+
import io.airbyte.cdk.load.command.aws.AWSArnRoleConfiguration
28+
import io.airbyte.cdk.load.command.aws.AwsAssumeRoleCredentials
29+
import io.airbyte.cdk.load.command.s3.S3BucketConfiguration
30+
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
31+
import io.airbyte.cdk.load.file.s3.S3ClientFactory.Companion.AIRBYTE_STS_SESSION_NAME
32+
import io.airbyte.cdk.load.util.setOnce
33+
import io.github.oshai.kotlinlogging.KotlinLogging
34+
import java.io.InputStream
35+
import java.util.concurrent.Executors
36+
import java.util.concurrent.atomic.AtomicBoolean
37+
import kotlinx.coroutines.flow.Flow
38+
import kotlinx.coroutines.flow.asFlow
39+
import org.apache.mina.util.ConcurrentHashSet
40+
41+
/**
42+
* A Java S3 SDK that is scheduled to be deprecated at the end of 2025. It was used by the old CDK
43+
* and lacks a bug that (sometimes) prevents the new Kotlin SDK from running with max concurrency.
44+
* (See https://github.com/awslabs/aws-sdk-kotlin/issues/1214#issuecomment-2464831817.)
45+
*
46+
* This can be enabled by injecting [io.airbyte.cdk.load.command.s3.S3ClientConfigurationProvider]
47+
* with [io.airbyte.cdk.load.command.s3.S3ClientConfiguration.useLegacyJavaClient] set to `true`.
48+
*
49+
* Currently this exists only to facilitate performance testing, but it may be needed as a fallback
50+
* if the new SDK cannot meet performance requirements.
51+
*/
52+
class S3LegacyJavaClient(val amazonS3: AmazonS3, val bucket: S3BucketConfiguration) : S3Client {
53+
override suspend fun list(prefix: String): Flow<S3Object> {
54+
return amazonS3
55+
.listObjectsV2(bucket.s3BucketName, prefix)
56+
.objectSummaries
57+
.map { S3Object(it.key, bucket) }
58+
.asFlow()
59+
}
60+
61+
override suspend fun move(remoteObject: S3Object, toKey: String): S3Object {
62+
amazonS3.copyObject(bucket.s3BucketName, remoteObject.key, bucket.s3BucketName, toKey)
63+
amazonS3.deleteObject(bucket.s3BucketName, remoteObject.key)
64+
65+
return S3Object(toKey, bucket)
66+
}
67+
68+
override suspend fun move(key: String, toKey: String): S3Object {
69+
return move(S3Object(key, bucket), toKey)
70+
}
71+
72+
override suspend fun <U> get(key: String, block: (InputStream) -> U): U {
73+
val obj = amazonS3.getObject(bucket.s3BucketName, key)
74+
return obj.objectContent.use(block)
75+
}
76+
77+
override suspend fun getMetadata(key: String): Map<String, String> {
78+
val obj = amazonS3.getObjectMetadata(bucket.s3BucketName, key)
79+
return obj.userMetadata
80+
}
81+
82+
override suspend fun put(key: String, bytes: ByteArray): S3Object {
83+
amazonS3.putObject(bucket.s3BucketName, key, bytes.inputStream(), null)
84+
return S3Object(key, bucket)
85+
}
86+
87+
override suspend fun delete(remoteObject: S3Object) {
88+
amazonS3.deleteObject(bucket.s3BucketName, remoteObject.key)
89+
}
90+
91+
override suspend fun delete(key: String) {
92+
amazonS3.deleteObject(bucket.s3BucketName, key)
93+
}
94+
95+
override suspend fun startStreamingUpload(
96+
key: String,
97+
metadata: Map<String, String>
98+
): StreamingUpload<S3Object> {
99+
val request =
100+
InitiateMultipartUploadRequest(bucket.s3BucketName, key)
101+
.withObjectMetadata(ObjectMetadata().also { it.userMetadata = metadata })
102+
val response = amazonS3.initiateMultipartUpload(request)
103+
return S3LegacyJavaStreamingUpload(amazonS3, bucket, response)
104+
}
105+
}
106+
107+
class S3LegacyJavaStreamingUpload(
108+
private val amazonS3: AmazonS3,
109+
private val bucket: S3BucketConfiguration,
110+
private val response: InitiateMultipartUploadResult
111+
) : StreamingUpload<S3Object> {
112+
private val log = KotlinLogging.logger {}
113+
114+
private val completed = AtomicBoolean(false)
115+
private val eTags = ConcurrentHashSet<PartETag>()
116+
117+
override suspend fun uploadPart(part: ByteArray, index: Int) {
118+
log.info { "Uploading part $index to ${response.key} (uploadId=${response.uploadId})" }
119+
val uploadPartRequest =
120+
UploadPartRequest()
121+
.withPartNumber(index)
122+
.withUploadId(response.uploadId)
123+
.withBucketName(bucket.s3BucketName)
124+
.withKey(response.key)
125+
.withInputStream(part.inputStream())
126+
.withPartSize(part.size.toLong())
127+
128+
val response = amazonS3.uploadPart(uploadPartRequest)
129+
eTags.add(response.partETag)
130+
}
131+
132+
override suspend fun complete(): S3Object {
133+
if (!completed.setOnce()) {
134+
log.warn {
135+
"Multipart upload already completed to ${response.key} (uploadId=${response.uploadId})"
136+
}
137+
} else if (eTags.isEmpty()) {
138+
log.warn { "Ignoring empty upload to ${response.key} (uploadId=${response.uploadId})" }
139+
} else {
140+
log.info {
141+
"Completing multipart upload to ${response.key} (uploadId=${response.uploadId})"
142+
}
143+
val completeMultipartUploadRequest =
144+
CompleteMultipartUploadRequest()
145+
.withUploadId(response.uploadId)
146+
.withPartETags(eTags.toList().sortedBy { it.partNumber })
147+
.withBucketName(bucket.s3BucketName)
148+
.withKey(response.key)
149+
amazonS3.completeMultipartUpload(completeMultipartUploadRequest)
150+
}
151+
152+
return S3Object(response.key, bucket)
153+
}
154+
}
155+
156+
class S3LegacyJavaClientFactory {
157+
private val clientBuilder = AmazonS3ClientBuilder.standard()
158+
159+
fun createFromAssumeRole(
160+
role: AWSArnRoleConfiguration,
161+
creds: AwsAssumeRoleCredentials,
162+
bucket: S3BucketConfiguration
163+
): S3LegacyJavaClient {
164+
val provider =
165+
STSAssumeRoleSessionCredentialsProvider.Builder(role.roleArn, AIRBYTE_STS_SESSION_NAME)
166+
.withExternalId(creds.externalId)
167+
.withStsClient(
168+
AWSSecurityTokenServiceClient.builder()
169+
.withRegion(Regions.DEFAULT_REGION)
170+
.withCredentials(
171+
AWSStaticCredentialsProvider(
172+
BasicAWSCredentials(creds.accessKey, creds.secretKey)
173+
)
174+
)
175+
.build()
176+
)
177+
.withAsyncRefreshExecutor(Executors.newSingleThreadExecutor(DaemonThreadFactory()))
178+
.build()
179+
val amazonS3 =
180+
clientBuilder
181+
.withCredentials(provider)
182+
.withRegion(bucket.s3BucketRegion.region)
183+
// the SDK defaults to RetryMode.LEGACY
184+
// (https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html)
185+
// this _can_ be configured via environment variable, but it seems more reliable
186+
// to
187+
// configure it
188+
// programmatically
189+
.withClientConfiguration(ClientConfiguration().withRetryMode(RetryMode.STANDARD))
190+
.build()
191+
return S3LegacyJavaClient(amazonS3, bucket)
192+
}
193+
194+
fun createFromAccessKey(
195+
keys: AWSAccessKeyConfiguration,
196+
bucket: S3BucketConfiguration
197+
): S3Client {
198+
val awsCreds: AWSCredentials = BasicAWSCredentials(keys.accessKeyId, keys.secretAccessKey)
199+
val provider = AWSStaticCredentialsProvider(awsCreds)
200+
val builder = clientBuilder.withCredentials(provider)
201+
val amazonS3 =
202+
if (bucket.s3Endpoint.isNullOrEmpty()) {
203+
builder.withRegion(bucket.s3BucketRegion.region)
204+
} else {
205+
val clientConfiguration = ClientConfiguration().withProtocol(Protocol.HTTPS)
206+
clientConfiguration.signerOverride = "AWSS3V4SignerType"
207+
208+
builder
209+
.withEndpointConfiguration(
210+
AwsClientBuilder.EndpointConfiguration(
211+
bucket.s3Endpoint,
212+
bucket.s3BucketRegion.region
213+
)
214+
)
215+
.withPathStyleAccessEnabled(true)
216+
.withClientConfiguration(clientConfiguration)
217+
}
218+
.build()
219+
return S3LegacyJavaClient(amazonS3, bucket)
220+
}
221+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.s3_v2
6+
7+
import io.airbyte.cdk.load.write.BasicPerformanceTest
8+
import org.junit.jupiter.api.Disabled
9+
import org.junit.jupiter.api.Test
10+
11+
@Disabled("We don't want this to run in CI")
12+
class S3V2JsonNoFrillsPerformanceTest :
13+
BasicPerformanceTest(
14+
configContents = S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH),
15+
configSpecClass = S3V2Specification::class.java,
16+
defaultRecordsToInsert = 1_000_000,
17+
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES
18+
) {
19+
@Test
20+
override fun testInsertRecords() {
21+
super.testInsertRecords()
22+
}
23+
24+
@Test
25+
override fun testRefreshingRecords() {
26+
super.testRefreshingRecords()
27+
}
28+
}
29+
30+
@Disabled("We don't want this to run in CI")
31+
class S3V2ParquetSnappyPerformanceTest :
32+
BasicPerformanceTest(
33+
configContents = S3V2TestUtils.getConfig(S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH),
34+
configSpecClass = S3V2Specification::class.java,
35+
defaultRecordsToInsert = 1_000_000,
36+
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES,
37+
) {
38+
@Test
39+
override fun testInsertRecords() {
40+
super.testInsertRecords()
41+
}
42+
43+
@Test
44+
override fun testRefreshingRecords() {
45+
super.testRefreshingRecords()
46+
}
47+
}

0 commit comments

Comments
 (0)