Skip to content

Commit 9ab7fa7

Browse files
committed
move S3Config into destination-s3; update deps
1 parent fa999d2 commit 9ab7fa7

File tree

15 files changed

+81
-78
lines changed

15 files changed

+81
-78
lines changed

airbyte-integrations/connectors/destination-jdbc/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ dependencies {
1010

1111
implementation project(':airbyte-db:lib')
1212
implementation project(':airbyte-integrations:bases:base-java')
13+
implementation project(':airbyte-integrations:connectors:destination-s3')
1314
implementation project(':airbyte-protocol:models')
1415

1516
implementation 'org.apache.commons:commons-lang3:3.11'

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java

+3-53
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,16 @@
66

77
import alex.mojaki.s3upload.MultiPartOutputStream;
88
import alex.mojaki.s3upload.StreamTransferManager;
9-
import com.amazonaws.ClientConfiguration;
10-
import com.amazonaws.auth.AWSStaticCredentialsProvider;
11-
import com.amazonaws.auth.BasicAWSCredentials;
12-
import com.amazonaws.client.builder.AwsClientBuilder;
139
import com.amazonaws.services.s3.AmazonS3;
14-
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
1510
import io.airbyte.commons.json.Jsons;
1611
import io.airbyte.commons.lang.Exceptions;
1712
import io.airbyte.db.jdbc.JdbcDatabase;
1813
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1914
import io.airbyte.integrations.destination.jdbc.SqlOperations;
2015
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
2116
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
17+
import io.airbyte.integrations.destination.s3.S3Config;
18+
import io.airbyte.integrations.destination.s3.S3Destination;
2219
import io.airbyte.protocol.models.AirbyteRecordMessage;
2320
import io.airbyte.protocol.models.DestinationSyncMode;
2421
import java.io.IOException;
@@ -43,10 +40,6 @@ public abstract class S3StreamCopier implements StreamCopier {
4340

4441
private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default.
4542
private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS;
46-
// The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives
47-
// us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit.
48-
// WARNING: Too large a part size can cause potential OOM errors.
49-
public static final int DEFAULT_PART_SIZE_MB = 10;
5043
// It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file.
5144
// The BATCH_SIZE is defined in CopyConsumerFactory.
5245
// The average size of such a file will be about 1 GB.
@@ -232,50 +225,7 @@ private void closeAndWaitForUpload() throws IOException {
232225
}
233226

234227
public static void attemptS3WriteAndDelete(final S3Config s3Config) {
235-
attemptS3WriteAndDelete(s3Config, "");
236-
}
237-
238-
public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) {
239-
final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/");
240-
final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
241-
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
242-
}
243-
244-
private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) {
245-
final var s3 = getAmazonS3(s3Config);
246-
final var s3Bucket = s3Config.getBucketName();
247-
248-
s3.putObject(s3Bucket, outputTableName, "check-content");
249-
s3.deleteObject(s3Bucket, outputTableName);
250-
}
251-
252-
public static AmazonS3 getAmazonS3(final S3Config s3Config) {
253-
final var endpoint = s3Config.getEndpoint();
254-
final var region = s3Config.getRegion();
255-
final var accessKeyId = s3Config.getAccessKeyId();
256-
final var secretAccessKey = s3Config.getSecretAccessKey();
257-
258-
final var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
259-
260-
if (endpoint.isEmpty()) {
261-
return AmazonS3ClientBuilder.standard()
262-
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
263-
.withRegion(s3Config.getRegion())
264-
.build();
265-
266-
} else {
267-
268-
final ClientConfiguration clientConfiguration = new ClientConfiguration();
269-
clientConfiguration.setSignerOverride("AWSS3V4SignerType");
270-
271-
return AmazonS3ClientBuilder
272-
.standard()
273-
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))
274-
.withPathStyleAccessEnabled(true)
275-
.withClientConfiguration(clientConfiguration)
276-
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
277-
.build();
278-
}
228+
S3Destination.attemptS3WriteAndDelete(s3Config, "");
279229
}
280230

281231
public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1111
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
1212
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
13+
import io.airbyte.integrations.destination.s3.S3Config;
14+
import io.airbyte.integrations.destination.s3.S3Destination;
1315
import io.airbyte.protocol.models.AirbyteStream;
1416
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1517
import io.airbyte.protocol.models.DestinationSyncMode;
@@ -31,7 +33,7 @@ public StreamCopier create(final String configuredSchema,
3133
final AirbyteStream stream = configuredStream.getStream();
3234
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
3335
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
34-
final AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
36+
final AmazonS3 s3Client = S3Destination.getAmazonS3(s3Config);
3537

3638
return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations);
3739
} catch (final Exception e) {

airbyte-integrations/connectors/destination-redshift/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies {
2020
implementation project(':airbyte-integrations:bases:base-java')
2121
implementation project(':airbyte-protocol:models')
2222
implementation project(':airbyte-integrations:connectors:destination-jdbc')
23+
implementation project(':airbyte-integrations:connectors:destination-s3')
2324
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
2425

2526
implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978'

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1414
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
1515
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
16-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
1716
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
17+
import io.airbyte.integrations.destination.s3.S3Config;
1818
import io.airbyte.protocol.models.AirbyteMessage;
1919
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
2020
import java.util.function.Consumer;

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
import io.airbyte.db.jdbc.JdbcDatabase;
1212
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1313
import io.airbyte.integrations.destination.jdbc.SqlOperations;
14-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
1514
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
1615
import io.airbyte.integrations.destination.redshift.manifest.Entry;
1716
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
17+
import io.airbyte.integrations.destination.s3.S3Config;
1818
import io.airbyte.protocol.models.DestinationSyncMode;
1919
import java.util.Optional;
2020
import java.util.UUID;

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1010
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1111
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
12-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
1312
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory;
13+
import io.airbyte.integrations.destination.s3.S3Config;
1414
import io.airbyte.protocol.models.DestinationSyncMode;
1515

1616
public class RedshiftStreamCopierFactory extends S3StreamCopierFactory {

airbyte-integrations/connectors/destination-s3/build.gradle

-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ dependencies {
1313
implementation project(':airbyte-config:models')
1414
implementation project(':airbyte-protocol:models')
1515
implementation project(':airbyte-integrations:bases:base-java')
16-
implementation project(':airbyte-integrations:connectors:destination-jdbc')
1716
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
1817

1918
// csv
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@
22
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.jdbc.copy.s3;
5+
package io.airbyte.integrations.destination.s3;
66

77
import com.fasterxml.jackson.databind.JsonNode;
88

99
public class S3Config {
1010

11+
// The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives
12+
// us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit.
13+
// WARNING: Too large a part size can cause potential OOM errors.
14+
public static final int DEFAULT_PART_SIZE_MB = 10;
15+
1116
private final String endpoint;
1217
private final String bucketName;
1318
private final String accessKeyId;
@@ -16,12 +21,12 @@ public class S3Config {
1621
private final Integer partSize;
1722

1823
public S3Config(
19-
final String endpoint,
20-
final String bucketName,
21-
final String accessKeyId,
22-
final String secretAccessKey,
23-
final String region,
24-
final Integer partSize) {
24+
final String endpoint,
25+
final String bucketName,
26+
final String accessKeyId,
27+
final String secretAccessKey,
28+
final String region,
29+
final Integer partSize) {
2530
this.endpoint = endpoint;
2631
this.bucketName = bucketName;
2732
this.accessKeyId = accessKeyId;
@@ -55,7 +60,7 @@ public Integer getPartSize() {
5560
}
5661

5762
public static S3Config getS3Config(final JsonNode config) {
58-
var partSize = S3StreamCopier.DEFAULT_PART_SIZE_MB;
63+
var partSize = DEFAULT_PART_SIZE_MB;
5964
if (config.get("part_size") != null) {
6065
partSize = config.get("part_size").asInt();
6166
}

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java

+52-5
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,24 @@
44

55
package io.airbyte.integrations.destination.s3;
66

7+
import com.amazonaws.ClientConfiguration;
8+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
9+
import com.amazonaws.auth.BasicAWSCredentials;
10+
import com.amazonaws.client.builder.AwsClientBuilder;
11+
import com.amazonaws.services.s3.AmazonS3;
12+
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
713
import com.fasterxml.jackson.databind.JsonNode;
814
import io.airbyte.integrations.BaseConnector;
915
import io.airbyte.integrations.base.AirbyteMessageConsumer;
1016
import io.airbyte.integrations.base.Destination;
1117
import io.airbyte.integrations.base.IntegrationRunner;
12-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
13-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
1418
import io.airbyte.integrations.destination.s3.writer.ProductionWriterFactory;
1519
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
1620
import io.airbyte.protocol.models.AirbyteConnectionStatus;
1721
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
1822
import io.airbyte.protocol.models.AirbyteMessage;
1923
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
24+
import java.util.UUID;
2025
import java.util.function.Consumer;
2126
import org.slf4j.Logger;
2227
import org.slf4j.LoggerFactory;
@@ -32,7 +37,7 @@ public static void main(final String[] args) throws Exception {
3237
@Override
3338
public AirbyteConnectionStatus check(final JsonNode config) {
3439
try {
35-
S3StreamCopier.attemptS3WriteAndDelete(S3Config.getS3Config(config), config.get("s3_bucket_path").asText());
40+
attemptS3WriteAndDelete(S3Config.getS3Config(config), config.get("s3_bucket_path").asText());
3641
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
3742
} catch (final Exception e) {
3843
LOGGER.error("Exception attempting to access the S3 bucket: ", e);
@@ -45,10 +50,52 @@ public AirbyteConnectionStatus check(final JsonNode config) {
4550

4651
@Override
4752
public AirbyteMessageConsumer getConsumer(final JsonNode config,
48-
final ConfiguredAirbyteCatalog configuredCatalog,
49-
final Consumer<AirbyteMessage> outputRecordCollector) {
53+
final ConfiguredAirbyteCatalog configuredCatalog,
54+
final Consumer<AirbyteMessage> outputRecordCollector) {
5055
final S3WriterFactory formatterFactory = new ProductionWriterFactory();
5156
return new S3Consumer(S3DestinationConfig.getS3DestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector);
5257
}
5358

59+
public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) {
60+
final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/");
61+
final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
62+
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
63+
}
64+
65+
private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) {
66+
final var s3 = getAmazonS3(s3Config);
67+
final var s3Bucket = s3Config.getBucketName();
68+
69+
s3.putObject(s3Bucket, outputTableName, "check-content");
70+
s3.deleteObject(s3Bucket, outputTableName);
71+
}
72+
73+
public static AmazonS3 getAmazonS3(final S3Config s3Config) {
74+
final var endpoint = s3Config.getEndpoint();
75+
final var region = s3Config.getRegion();
76+
final var accessKeyId = s3Config.getAccessKeyId();
77+
final var secretAccessKey = s3Config.getSecretAccessKey();
78+
79+
final var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
80+
81+
if (endpoint.isEmpty()) {
82+
return AmazonS3ClientBuilder.standard()
83+
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
84+
.withRegion(s3Config.getRegion())
85+
.build();
86+
87+
} else {
88+
89+
final ClientConfiguration clientConfiguration = new ClientConfiguration();
90+
clientConfiguration.setSignerOverride("AWSS3V4SignerType");
91+
92+
return AmazonS3ClientBuilder
93+
.standard()
94+
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))
95+
.withPathStyleAccessEnabled(true)
96+
.withClientConfiguration(clientConfiguration)
97+
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
98+
.build();
99+
}
100+
}
54101
}

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,9 @@
1212
import com.amazonaws.services.s3.AmazonS3;
1313
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
1414
import com.fasterxml.jackson.databind.JsonNode;
15-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
1615

1716
/**
18-
* This class is similar to {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3Config}. It
19-
* has an extra {@code bucketPath} parameter, which is necessary for more delicate data syncing to
20-
* S3.
17+
* This class is similar to {@link S3Config}. It has an extra {@code bucketPath} parameter, which is necessary for more delicate data syncing to S3.
2118
*/
2219
public class S3DestinationConfig {
2320

airbyte-integrations/connectors/destination-snowflake/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dependencies {
3434
implementation project(':airbyte-db:lib')
3535
implementation project(':airbyte-integrations:bases:base-java')
3636
implementation project(':airbyte-integrations:connectors:destination-jdbc')
37+
implementation project(':airbyte-integrations:connectors:destination-s3')
3738
implementation project(':airbyte-protocol:models')
3839

3940
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1212
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
1313
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
14-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
1514
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
15+
import io.airbyte.integrations.destination.s3.S3Config;
1616
import io.airbyte.protocol.models.AirbyteMessage;
1717
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1818
import java.util.function.Consumer;

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import io.airbyte.db.jdbc.JdbcDatabase;
1010
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1111
import io.airbyte.integrations.destination.jdbc.SqlOperations;
12-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
1312
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
13+
import io.airbyte.integrations.destination.s3.S3Config;
1414
import io.airbyte.protocol.models.DestinationSyncMode;
1515
import java.sql.SQLException;
1616
import org.slf4j.Logger;

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1010
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1111
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
12-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config;
1312
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory;
13+
import io.airbyte.integrations.destination.s3.S3Config;
1414
import io.airbyte.protocol.models.DestinationSyncMode;
1515

1616
public class SnowflakeS3StreamCopierFactory extends S3StreamCopierFactory {

0 commit comments

Comments
 (0)