diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 3cf381f101c62..4705e5eb53e85 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -150,7 +150,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.22 + dockerImageTag: 0.3.23 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg - name: Rockset diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index fe77429f6aebe..bc92ecd78d71e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -2918,7 +2918,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.22" +- dockerImage: "airbyte/destination-redshift:0.3.23" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift" connectionSpecification: @@ -3043,6 +3043,13 @@ \ in larger memory requirements. A rule of thumb is to multiply the part\ \ size by 10 to get the memory requirement. Modify this with care." title: "Stream Part Size" + purge_staging_data: + title: "Purge Staging Files and Tables" + type: "boolean" + description: "Whether to delete the staging files from S3 after completing\ + \ the sync. See the docs for details. Only relevant for COPY. Defaults\ + \ to true." + default: true supportsIncremental: true supportsNormalization: true supportsDBT: true diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3CopyConfig.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3CopyConfig.java new file mode 100644 index 0000000000000..bfce8529dfceb --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3CopyConfig.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.jdbc.copy.s3; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.integrations.destination.s3.S3DestinationConfig; + +/** + * S3 copy destinations need an S3DestinationConfig to configure the basic upload behavior. We also + * want additional flags to configure behavior that only applies to the copy-to-S3 + + * load-into-warehouse portion. Currently this is just purgeStagingData, but this may expand. + */ +public record S3CopyConfig(boolean purgeStagingData, S3DestinationConfig s3Config) { + + public static boolean shouldPurgeStagingData(final JsonNode config) { + if (config.get("purge_staging_data") == null) { + return true; + } else { + return config.get("purge_staging_data").asBoolean(); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 2c69ff10e3efc..a8492074571a4 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -47,6 +47,7 @@ public abstract class S3StreamCopier implements StreamCopier { private final Timestamp uploadTime; protected final String stagingFolder; protected final Map stagingWritersByFile = new HashMap<>(); + private final boolean purgeStagingData; // The number of batches of records that will be inserted into each file. private final int maxPartsPerFile; @@ -64,7 +65,7 @@ public S3StreamCopier(final String stagingFolder, final String schema, final AmazonS3 client, final JdbcDatabase db, - final S3DestinationConfig s3Config, + final S3CopyConfig config, final ExtendedNameTransformer nameTransformer, final SqlOperations sqlOperations, final ConfiguredAirbyteStream configuredAirbyteStream, @@ -81,7 +82,8 @@ public S3StreamCopier(final String stagingFolder, this.uploadTime = uploadTime; this.tmpTableName = nameTransformer.getTmpTableName(this.streamName); this.s3Client = client; - this.s3Config = s3Config; + this.s3Config = config.s3Config(); + this.purgeStagingData = config.purgeStagingData(); this.maxPartsPerFile = maxPartsPerFile; this.partsAddedToCurrentFile = 0; @@ -176,15 +178,17 @@ public String generateMergeStatement(final String destTableName) { @Override public void removeFileAndDropTmpTable() throws Exception { - for (final Map.Entry entry : stagingWritersByFile.entrySet()) { - final String suffix = entry.getKey(); - final String objectKey = entry.getValue().getOutputPath(); - - LOGGER.info("Begin cleaning s3 staging file {}.", objectKey); - if (s3Client.doesObjectExist(s3Config.getBucketName(), objectKey)) { - s3Client.deleteObject(s3Config.getBucketName(), objectKey); + if (purgeStagingData) { + for (final Map.Entry entry : stagingWritersByFile.entrySet()) { + final String suffix = entry.getKey(); + final String objectKey = entry.getValue().getOutputPath(); + + LOGGER.info("Begin cleaning s3 staging file {}.", objectKey); + if (s3Client.doesObjectExist(s3Config.getBucketName(), objectKey)) { + s3Client.deleteObject(s3Config.getBucketName(), objectKey); + } + LOGGER.info("S3 staging file {} cleaned.", suffix); } - LOGGER.info("S3 staging file {} cleaned.", suffix); } LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java index f8d10ac047c82..6aad77fcc951d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java @@ -10,18 +10,17 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory; -import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; -public abstract class S3StreamCopierFactory implements StreamCopierFactory { +public abstract class S3StreamCopierFactory implements StreamCopierFactory { /** * Used by the copy consumer. */ @Override public StreamCopier create(final String configuredSchema, - final S3DestinationConfig s3Config, + final S3CopyConfig config, final String stagingFolder, final ConfiguredAirbyteStream configuredStream, final ExtendedNameTransformer nameTransformer, @@ -30,9 +29,9 @@ public StreamCopier create(final String configuredSchema, try { final AirbyteStream stream = configuredStream.getStream(); final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer); - final AmazonS3 s3Client = s3Config.getS3Client(); + final AmazonS3 s3Client = config.s3Config().getS3Client(); - return create(stagingFolder, schema, s3Client, db, s3Config, nameTransformer, sqlOperations, configuredStream); + return create(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream); } catch (final Exception e) { throw new RuntimeException(e); } @@ -45,7 +44,7 @@ protected abstract StreamCopier create(String stagingFolder, String schema, AmazonS3 s3Client, JdbcDatabase db, - S3DestinationConfig s3Config, + S3CopyConfig config, ExtendedNameTransformer nameTransformer, SqlOperations sqlOperations, ConfiguredAirbyteStream configuredStream) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3CopyConfigTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3CopyConfigTest.java new file mode 100644 index 0000000000000..4622a985ff11d --- /dev/null +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3CopyConfigTest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.jdbc.copy.s3; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +public class S3CopyConfigTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Test + public void setsDefaultValues() throws IOException { + final boolean purgeStagingData = S3CopyConfig.shouldPurgeStagingData(OBJECT_MAPPER.readTree("{}")); + + assertTrue(purgeStagingData); + } + + @Test + public void parsesPurgeStagingDataCorrectly() throws IOException { + final boolean purgeStagingData = S3CopyConfig.shouldPurgeStagingData(OBJECT_MAPPER.readTree( + """ + { + "purge_staging_data": false + } + """)); + + assertFalse(purgeStagingData); + } + +} diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java index 29da764edcf5b..2ddab245b7de7 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import com.amazonaws.services.s3.AmazonS3Client; @@ -130,7 +131,7 @@ public void setup() { "fake-schema", s3Client, db, - S3_CONFIG, + new S3CopyConfig(true, S3_CONFIG), new ExtendedNameTransformer(), sqlOperations, CONFIGURED_STREAM, @@ -220,6 +221,41 @@ public void deletesStagingFiles() throws Exception { verify(s3Client).deleteObject("fake-bucket", "fakeOutputPath-00000"); } + @Test + public void doesNotDeleteStagingFiles_if_purgeStagingDataDisabled() throws Exception { + copier = new S3StreamCopier( + "fake-staging-folder", + "fake-schema", + s3Client, + db, + // Explicitly disable purgeStagingData + new S3CopyConfig(false, S3_CONFIG), + new ExtendedNameTransformer(), + sqlOperations, + CONFIGURED_STREAM, + UPLOAD_TIME, + MAX_PARTS_PER_FILE) { + + @Override + public void copyS3CsvFileIntoTable( + final JdbcDatabase database, + final String s3FileLocation, + final String schema, + final String tableName, + final S3DestinationConfig s3Config) { + copyArguments.add(new CopyArguments(database, s3FileLocation, schema, tableName, s3Config)); + } + + }; + + copier.prepareStagingFile(); + doReturn(true).when(s3Client).doesObjectExist("fake-bucket", "fakeOutputPath-00000"); + + copier.removeFileAndDropTmpTable(); + + verify(s3Client, never()).deleteObject("fake-bucket", "fakeOutputPath-00000"); + } + @Test public void copiesCorrectFilesToTable() throws Exception { // Generate two files diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index e1b74f53c5e52..9d247e2507b96 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.22 +LABEL io.airbyte.version=0.3.23 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java index 1220ecfc19889..1b5249d0ed7fa 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java @@ -13,6 +13,7 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory; import io.airbyte.integrations.destination.jdbc.copy.CopyDestination; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig; import io.airbyte.integrations.destination.s3.S3Destination; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteMessage; @@ -42,7 +43,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, getDatabase(config), getSqlOperations(), getNameTransformer(), - getS3DestinationConfig(config), + new S3CopyConfig(S3CopyConfig.shouldPurgeStagingData(config), getS3DestinationConfig(config)), catalog, new RedshiftStreamCopierFactory(), getConfiguredSchema(config)); diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java index 90821165b5eba..c296ddf1d1265 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java @@ -11,6 +11,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; import io.airbyte.integrations.destination.redshift.manifest.Entry; import io.airbyte.integrations.destination.redshift.manifest.Manifest; @@ -39,14 +40,17 @@ public RedshiftStreamCopier(final String stagingFolder, final String schema, final AmazonS3 client, final JdbcDatabase db, - final S3DestinationConfig s3Config, + final S3CopyConfig config, final ExtendedNameTransformer nameTransformer, final SqlOperations sqlOperations, final ConfiguredAirbyteStream configuredAirbyteStream) { this( stagingFolder, - schema, client, db, - s3Config, nameTransformer, + schema, + client, + db, + config, + nameTransformer, sqlOperations, Timestamp.from(Instant.now()), configuredAirbyteStream); @@ -57,12 +61,21 @@ public RedshiftStreamCopier(final String stagingFolder, final String schema, final AmazonS3 client, final JdbcDatabase db, - final S3DestinationConfig s3Config, + final S3CopyConfig config, final ExtendedNameTransformer nameTransformer, final SqlOperations sqlOperations, final Timestamp uploadTime, final ConfiguredAirbyteStream configuredAirbyteStream) { - super(stagingFolder, schema, client, db, s3Config, nameTransformer, sqlOperations, configuredAirbyteStream, uploadTime, MAX_PARTS_PER_FILE); + super(stagingFolder, + schema, + client, + db, + config, + nameTransformer, + sqlOperations, + configuredAirbyteStream, + uploadTime, + MAX_PARTS_PER_FILE); objectMapper = new ObjectMapper(); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java index beee10e81f368..9876f03800b29 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java @@ -9,8 +9,8 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory; -import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.ConfiguredAirbyteStream; /** @@ -23,11 +23,11 @@ public StreamCopier create(final String stagingFolder, final String schema, final AmazonS3 s3Client, final JdbcDatabase db, - final S3DestinationConfig s3Config, + final S3CopyConfig config, final ExtendedNameTransformer nameTransformer, final SqlOperations sqlOperations, final ConfiguredAirbyteStream configuredStream) { - return new RedshiftStreamCopier(stagingFolder, schema, s3Client, db, s3Config, nameTransformer, sqlOperations, configuredStream); + return new RedshiftStreamCopier(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream); } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index b1f932a018c35..360372f2ca895 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -111,6 +111,12 @@ "examples": ["10"], "description": "Optional. Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.", "title": "Stream Part Size" + }, + "purge_staging_data": { + "title": "Purge Staging Files and Tables", + "type": "boolean", + "description": "Whether to delete the staging files from S3 after completing the sync. See the docs for details. Only relevant for COPY. Defaults to true.", + "default": true } } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java index f91daefd8cb51..3f2b44536be6d 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java @@ -19,6 +19,7 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; +import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -70,15 +71,17 @@ public void setup() { "fake-schema", s3Client, db, - new S3DestinationConfig( - "fake-endpoint", - "fake-bucket", - "fake-bucketPath", - "fake-region", - "fake-access-key-id", - "fake-secret-access-key", - PART_SIZE, - null), + new S3CopyConfig( + true, + new S3DestinationConfig( + "fake-endpoint", + "fake-bucket", + "fake-bucketPath", + "fake-region", + "fake-access-key-id", + "fake-secret-access-key", + PART_SIZE, + null)), new ExtendedNameTransformer(), sqlOperations, UPLOAD_TIME, diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 2b4406ff7803f..3437d4079a572 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -88,6 +88,8 @@ Provide the required S3 info. Optional parameters: * **Bucket Path** * The directory within the S3 bucket to place the staging data. For example, if you set this to `yourFavoriteSubdirectory`, we will place the staging data inside `s3://yourBucket/yourFavoriteSubdirectory`. If not provided, defaults to the root directory. +* **Purge Staging Data** + * Whether to delete the staging files from S3 after completing the sync. Specifically, the connector will create CSV files named `bucketPath/namespace/streamName/syncDate_epochMillis_randomUuid.csv` containing three columns (`ab_id`, `data`, `emitted_at`). Normally these files are deleted after the `COPY` command completes; if you want to keep them for other purposes, set `purge_staging_data` to `false`. ## Notes about Redshift Naming Conventions @@ -122,6 +124,7 @@ All Redshift connections are encrypted using SSL | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.3.23 | 2021-12-16 | [\#8855](https://github.com/airbytehq/airbyte/pull/8855) | Add `purgeStagingData` option to enable/disable deleting the staging data | | 0.3.22 | 2021-12-15 | [#8607](https://github.com/airbytehq/airbyte/pull/8607) | Accept a path for the staging data | | 0.3.21 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management | | 0.3.20 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |