Skip to content

Commit b715f6a

Browse files
edgaojrhizor
authored andcommitted
move S3Config into destination-s3; update dependencies accordingly (#8562)
1 parent 87046af commit b715f6a

File tree

21 files changed

+249
-380
lines changed

21 files changed

+249
-380
lines changed

airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1414
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
1515
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
16-
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
16+
import io.airbyte.integrations.destination.s3.S3Destination;
1717
import io.airbyte.protocol.models.AirbyteMessage;
1818
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1919
import java.util.function.Consumer;
@@ -30,8 +30,8 @@ public static void main(final String[] args) throws Exception {
3030

3131
@Override
3232
public AirbyteMessageConsumer getConsumer(final JsonNode config,
33-
final ConfiguredAirbyteCatalog catalog,
34-
final Consumer<AirbyteMessage> outputRecordCollector) {
33+
final ConfiguredAirbyteCatalog catalog,
34+
final Consumer<AirbyteMessage> outputRecordCollector) {
3535
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
3636
return CopyConsumerFactory.create(
3737
outputRecordCollector,
@@ -47,7 +47,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
4747
@Override
4848
public void checkPersistence(final JsonNode config) {
4949
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
50-
S3StreamCopier.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig().getS3Config());
50+
S3Destination.attemptS3WriteAndDelete(databricksConfig.getS3DestinationConfig(), "");
5151
}
5252

5353
@Override

airbyte-integrations/connectors/destination-jdbc/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ dependencies {
1010

1111
implementation project(':airbyte-db:lib')
1212
implementation project(':airbyte-integrations:bases:base-java')
13+
implementation project(':airbyte-integrations:connectors:destination-s3')
1314
implementation project(':airbyte-protocol:models')
1415

1516
implementation 'org.apache.commons:commons-lang3:3.11'

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3Config.java

-71
This file was deleted.

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java

+15-70
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,15 @@
66

77
import alex.mojaki.s3upload.MultiPartOutputStream;
88
import alex.mojaki.s3upload.StreamTransferManager;
9-
import com.amazonaws.ClientConfiguration;
10-
import com.amazonaws.auth.AWSStaticCredentialsProvider;
11-
import com.amazonaws.auth.BasicAWSCredentials;
12-
import com.amazonaws.client.builder.AwsClientBuilder;
139
import com.amazonaws.services.s3.AmazonS3;
14-
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
1510
import io.airbyte.commons.json.Jsons;
1611
import io.airbyte.commons.lang.Exceptions;
1712
import io.airbyte.db.jdbc.JdbcDatabase;
1813
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1914
import io.airbyte.integrations.destination.jdbc.SqlOperations;
2015
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
2116
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
17+
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
2218
import io.airbyte.protocol.models.AirbyteRecordMessage;
2319
import io.airbyte.protocol.models.DestinationSyncMode;
2420
import java.io.IOException;
@@ -43,10 +39,6 @@ public abstract class S3StreamCopier implements StreamCopier {
4339

4440
private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default.
4541
private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS;
46-
// The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives
47-
// us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit.
48-
// WARNING: Too large a part size can cause potential OOM errors.
49-
public static final int DEFAULT_PART_SIZE_MB = 10;
5042
// It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file.
5143
// The BATCH_SIZE is defined in CopyConsumerFactory.
5244
// The average size of such a file will be about 1 GB.
@@ -57,7 +49,7 @@ public abstract class S3StreamCopier implements StreamCopier {
5749
public static final int MAX_PARTS_PER_FILE = 1000;
5850

5951
protected final AmazonS3 s3Client;
60-
protected final S3Config s3Config;
52+
protected final S3DestinationConfig s3Config;
6153
protected final String tmpTableName;
6254
private final DestinationSyncMode destSyncMode;
6355
protected final String schemaName;
@@ -74,15 +66,15 @@ public abstract class S3StreamCopier implements StreamCopier {
7466
private final StagingFilenameGenerator filenameGenerator;
7567

7668
public S3StreamCopier(final String stagingFolder,
77-
final DestinationSyncMode destSyncMode,
78-
final String schema,
79-
final String streamName,
80-
final String s3FileName,
81-
final AmazonS3 client,
82-
final JdbcDatabase db,
83-
final S3Config s3Config,
84-
final ExtendedNameTransformer nameTransformer,
85-
final SqlOperations sqlOperations) {
69+
final DestinationSyncMode destSyncMode,
70+
final String schema,
71+
final String streamName,
72+
final String s3FileName,
73+
final AmazonS3 client,
74+
final JdbcDatabase db,
75+
final S3DestinationConfig s3Config,
76+
final ExtendedNameTransformer nameTransformer,
77+
final SqlOperations sqlOperations) {
8678
this.destSyncMode = destSyncMode;
8779
this.schemaName = schema;
8880
this.streamName = streamName;
@@ -231,58 +223,11 @@ private void closeAndWaitForUpload() throws IOException {
231223
LOGGER.info("All data for {} stream uploaded.", streamName);
232224
}
233225

234-
public static void attemptS3WriteAndDelete(final S3Config s3Config) {
235-
attemptS3WriteAndDelete(s3Config, "");
236-
}
237-
238-
public static void attemptS3WriteAndDelete(final S3Config s3Config, final String bucketPath) {
239-
final var prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/");
240-
final String outputTableName = prefix + "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
241-
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
242-
}
243-
244-
private static void attemptWriteAndDeleteS3Object(final S3Config s3Config, final String outputTableName) {
245-
final var s3 = getAmazonS3(s3Config);
246-
final var s3Bucket = s3Config.getBucketName();
247-
248-
s3.putObject(s3Bucket, outputTableName, "check-content");
249-
s3.deleteObject(s3Bucket, outputTableName);
250-
}
251-
252-
public static AmazonS3 getAmazonS3(final S3Config s3Config) {
253-
final var endpoint = s3Config.getEndpoint();
254-
final var region = s3Config.getRegion();
255-
final var accessKeyId = s3Config.getAccessKeyId();
256-
final var secretAccessKey = s3Config.getSecretAccessKey();
257-
258-
final var awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
259-
260-
if (endpoint.isEmpty()) {
261-
return AmazonS3ClientBuilder.standard()
262-
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
263-
.withRegion(s3Config.getRegion())
264-
.build();
265-
266-
} else {
267-
268-
final ClientConfiguration clientConfiguration = new ClientConfiguration();
269-
clientConfiguration.setSignerOverride("AWSS3V4SignerType");
270-
271-
return AmazonS3ClientBuilder
272-
.standard()
273-
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region))
274-
.withPathStyleAccessEnabled(true)
275-
.withClientConfiguration(clientConfiguration)
276-
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
277-
.build();
278-
}
279-
}
280-
281226
public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,
282-
String s3FileLocation,
283-
String schema,
284-
String tableName,
285-
S3Config s3Config)
227+
String s3FileLocation,
228+
String schema,
229+
String tableName,
230+
S3DestinationConfig s3Config)
286231
throws SQLException;
287232

288233
}

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java

+17-16
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,29 @@
1010
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1111
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
1212
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
13+
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1314
import io.airbyte.protocol.models.AirbyteStream;
1415
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1516
import io.airbyte.protocol.models.DestinationSyncMode;
1617

17-
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3Config> {
18+
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {
1819

1920
/**
2021
* Used by the copy consumer.
2122
*/
2223
@Override
2324
public StreamCopier create(final String configuredSchema,
24-
final S3Config s3Config,
25-
final String stagingFolder,
26-
final ConfiguredAirbyteStream configuredStream,
27-
final ExtendedNameTransformer nameTransformer,
28-
final JdbcDatabase db,
29-
final SqlOperations sqlOperations) {
25+
final S3DestinationConfig s3Config,
26+
final String stagingFolder,
27+
final ConfiguredAirbyteStream configuredStream,
28+
final ExtendedNameTransformer nameTransformer,
29+
final JdbcDatabase db,
30+
final SqlOperations sqlOperations) {
3031
try {
3132
final AirbyteStream stream = configuredStream.getStream();
3233
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
3334
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
34-
final AmazonS3 s3Client = S3StreamCopier.getAmazonS3(s3Config);
35+
final AmazonS3 s3Client = s3Config.getS3Client();
3536

3637
return create(stagingFolder, syncMode, schema, stream.getName(), s3Client, db, s3Config, nameTransformer, sqlOperations);
3738
} catch (final Exception e) {
@@ -43,14 +44,14 @@ public StreamCopier create(final String configuredSchema,
4344
* For specific copier suppliers to implement.
4445
*/
4546
public abstract StreamCopier create(String stagingFolder,
46-
DestinationSyncMode syncMode,
47-
String schema,
48-
String streamName,
49-
AmazonS3 s3Client,
50-
JdbcDatabase db,
51-
S3Config s3Config,
52-
ExtendedNameTransformer nameTransformer,
53-
SqlOperations sqlOperations)
47+
DestinationSyncMode syncMode,
48+
String schema,
49+
String streamName,
50+
AmazonS3 s3Client,
51+
JdbcDatabase db,
52+
S3DestinationConfig s3Config,
53+
ExtendedNameTransformer nameTransformer,
54+
SqlOperations sqlOperations)
5455
throws Exception;
5556

5657
}

0 commit comments

Comments
 (0)