Skip to content

Commit 7038533

Browse files
authored
πŸŽ‰ Destination Redshift (copy): accept bucket path for staging data (#8607)
1 parent 16f176b commit 7038533

File tree

42 files changed

+1796
-186
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1796
-186
lines changed

β€Žairbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
- name: Redshift
151151
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
152152
dockerRepository: airbyte/destination-redshift
153-
dockerImageTag: 0.3.21
153+
dockerImageTag: 0.3.22
154154
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
155155
icon: redshift.svg
156156
- name: Rockset
@@ -161,7 +161,7 @@
161161
- name: S3
162162
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
163163
dockerRepository: airbyte/destination-s3
164-
dockerImageTag: 0.1.16
164+
dockerImageTag: 0.2.0
165165
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
166166
icon: s3.svg
167167
- name: SFTP-JSON

β€Žairbyte-config/init/src/main/resources/seed/destination_specs.yaml

+9-2
Original file line numberDiff line numberDiff line change
@@ -2918,7 +2918,7 @@
29182918
supported_destination_sync_modes:
29192919
- "overwrite"
29202920
- "append"
2921-
- dockerImage: "airbyte/destination-redshift:0.3.21"
2921+
- dockerImage: "airbyte/destination-redshift:0.3.22"
29222922
spec:
29232923
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
29242924
connectionSpecification:
@@ -2979,6 +2979,13 @@
29792979
>AWS docs</a> for more details."
29802980
examples:
29812981
- "airbyte.staging"
2982+
s3_bucket_path:
2983+
title: "S3 Bucket Path"
2984+
type: "string"
2985+
description: "The directory under the S3 bucket where data will be written.\
2986+
\ If not provided, then defaults to the root directory."
2987+
examples:
2988+
- "data_sync/test"
29822989
s3_bucket_region:
29832990
title: "S3 Bucket Region"
29842991
type: "string"
@@ -3086,7 +3093,7 @@
30863093
supported_destination_sync_modes:
30873094
- "append"
30883095
- "overwrite"
3089-
- dockerImage: "airbyte/destination-s3:0.1.16"
3096+
- dockerImage: "airbyte/destination-s3:0.2.0"
30903097
spec:
30913098
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
30923099
connectionSpecification:

β€Žairbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1111
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1212
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
13+
import io.airbyte.integrations.destination.jdbc.copy.s3.LegacyS3StreamCopier;
1314
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1415
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
1516
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
@@ -23,9 +24,8 @@
2324
import org.slf4j.LoggerFactory;
2425

2526
/**
26-
* This implementation is similar to
27-
* {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. The difference is that
28-
* this implementation creates Parquet staging files, instead of CSV ones.
27+
* This implementation is similar to {@link LegacyS3StreamCopier}. The difference is that this
28+
* implementation creates Parquet staging files, instead of CSV ones.
2929
* <p>
3030
* </p>
3131
* It does the following operations:

β€Žairbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class GcsAvroWriter extends BaseGcsWriter implements S3Writer {
3636
private final StreamTransferManager uploadManager;
3737
private final MultiPartOutputStream outputStream;
3838
private final DataFileWriter<GenericData.Record> dataFileWriter;
39+
private final String objectKey;
3940

4041
public GcsAvroWriter(final GcsDestinationConfig config,
4142
final AmazonS3 s3Client,
@@ -47,7 +48,7 @@ public GcsAvroWriter(final GcsDestinationConfig config,
4748
super(config, s3Client, configuredStream);
4849

4950
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.AVRO);
50-
final String objectKey = String.join("/", outputPrefix, outputFilename);
51+
objectKey = String.join("/", outputPrefix, outputFilename);
5152

5253
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
5354
objectKey);
@@ -85,4 +86,9 @@ protected void closeWhenFail() throws IOException {
8586
uploadManager.abort();
8687
}
8788

89+
@Override
90+
public String getOutputPath() {
91+
return objectKey;
92+
}
93+
8894
}

β€Žairbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class GcsCsvWriter extends BaseGcsWriter implements S3Writer {
3636
private final MultiPartOutputStream outputStream;
3737
private final CSVPrinter csvPrinter;
3838
private final String gcsCsvFileLocation; // this used in destination-bigquery (GCS upload type)
39+
private final String objectKey;
3940

4041
public GcsCsvWriter(final GcsDestinationConfig config,
4142
final AmazonS3 s3Client,
@@ -48,7 +49,7 @@ public GcsCsvWriter(final GcsDestinationConfig config,
4849
this.csvSheetGenerator = CsvSheetGenerator.Factory.create(configuredStream.getStream().getJsonSchema(), formatConfig);
4950

5051
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.CSV);
51-
final String objectKey = String.join("/", outputPrefix, outputFilename);
52+
objectKey = String.join("/", outputPrefix, outputFilename);
5253
gcsCsvFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
5354

5455
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
@@ -90,4 +91,9 @@ public CSVPrinter getCsvPrinter() {
9091
return csvPrinter;
9192
}
9293

94+
@Override
95+
public String getOutputPath() {
96+
return objectKey;
97+
}
98+
9399
}

β€Žairbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer {
3535
private final StreamTransferManager uploadManager;
3636
private final MultiPartOutputStream outputStream;
3737
private final PrintWriter printWriter;
38+
private final String objectKey;
3839

3940
public GcsJsonlWriter(final GcsDestinationConfig config,
4041
final AmazonS3 s3Client,
@@ -43,7 +44,7 @@ public GcsJsonlWriter(final GcsDestinationConfig config,
4344
super(config, s3Client, configuredStream);
4445

4546
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.JSONL);
46-
final String objectKey = String.join("/", outputPrefix, outputFilename);
47+
objectKey = String.join("/", outputPrefix, outputFilename);
4748

4849
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);
4950

@@ -78,4 +79,9 @@ protected void closeWhenFail() {
7879
uploadManager.abort();
7980
}
8081

82+
@Override
83+
public String getOutputPath() {
84+
return objectKey;
85+
}
86+
8187
}

β€Žairbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import com.amazonaws.services.s3.AmazonS3;
88
import com.fasterxml.jackson.databind.ObjectMapper;
9-
import com.fasterxml.jackson.databind.ObjectWriter;
109
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
1110
import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig;
1211
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
@@ -37,10 +36,10 @@ public class GcsParquetWriter extends BaseGcsWriter implements S3Writer {
3736

3837
private static final Logger LOGGER = LoggerFactory.getLogger(GcsParquetWriter.class);
3938
private static final ObjectMapper MAPPER = new ObjectMapper();
40-
private static final ObjectWriter WRITER = MAPPER.writer();
4139

4240
private final ParquetWriter<Record> parquetWriter;
4341
private final AvroRecordFactory avroRecordFactory;
42+
private final String objectKey;
4443

4544
public GcsParquetWriter(final GcsDestinationConfig config,
4645
final AmazonS3 s3Client,
@@ -52,7 +51,7 @@ public GcsParquetWriter(final GcsDestinationConfig config,
5251
super(config, s3Client, configuredStream);
5352

5453
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.PARQUET);
55-
final String objectKey = String.join("/", outputPrefix, outputFilename);
54+
objectKey = String.join("/", outputPrefix, outputFilename);
5655
LOGGER.info("Storage path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);
5756

5857
final URI uri = new URI(String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename));
@@ -109,4 +108,9 @@ public void close(final boolean hasFailed) throws IOException {
109108
}
110109
}
111110

111+
@Override
112+
public String getOutputPath() {
113+
return objectKey;
114+
}
115+
112116
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.gcs.avro;
6+
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
9+
import static org.mockito.Mockito.mock;
10+
11+
import com.amazonaws.services.s3.AmazonS3;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
14+
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
15+
import io.airbyte.protocol.models.AirbyteStream;
16+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
17+
import java.io.IOException;
18+
import java.sql.Timestamp;
19+
import java.time.Instant;
20+
import org.apache.avro.Schema;
21+
import org.junit.jupiter.api.Test;
22+
23+
class GcsAvroWriterTest {
24+
25+
@Test
26+
public void generatesCorrectObjectPath() throws IOException {
27+
final GcsAvroWriter writer = new GcsAvroWriter(
28+
new GcsDestinationConfig(
29+
"fake-bucket",
30+
"fake-bucketPath",
31+
"fake-bucketRegion",
32+
null,
33+
new S3AvroFormatConfig(new ObjectMapper().createObjectNode())),
34+
mock(AmazonS3.class, RETURNS_DEEP_STUBS),
35+
new ConfiguredAirbyteStream()
36+
.withStream(new AirbyteStream()
37+
.withNamespace("fake-namespace")
38+
.withName("fake-stream")),
39+
Timestamp.from(Instant.ofEpochMilli(1234)),
40+
mock(Schema.class),
41+
null);
42+
43+
assertEquals("fake-bucketPath/fake_namespace/fake_stream/1970_01_01_1234_0.avro", writer.getOutputPath());
44+
}
45+
46+
}

β€Žairbyte-integrations/connectors/destination-jdbc/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
implementation 'com.fasterxml.jackson.core:jackson-databind'
2020

2121
testImplementation "org.testcontainers:postgresql:1.15.3"
22+
testImplementation "org.mockito:mockito-inline:4.1.0"
2223

2324
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
2425
integrationTestJavaImplementation "org.testcontainers:postgresql:1.15.3"

β€Žairbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class CopyConsumerFactory {
2929

3030
private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);
3131

32-
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 mib
32+
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 MiB
3333

3434
public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
3535
final JdbcDatabase database,

β€Žairbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public interface StreamCopier {
6161
/**
6262
* Creates the staging file and all the necessary items to write data to this file.
6363
*
64-
* @return the name of the staging file
64+
* @return A string that unqiuely identifies the file. E.g. the filename, or a unique suffix that is
65+
* appended to a shared filename prefix
6566
*/
6667
String prepareStagingFile();
6768

0 commit comments

Comments
Β (0)