Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination Redshift: make purgeStagingData optional #8855

Merged
merged 7 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.22
dockerImageTag: 0.3.23
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- name: Rockset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2918,7 +2918,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.22"
- dockerImage: "airbyte/destination-redshift:0.3.23"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3043,6 +3043,13 @@
\ in larger memory requirements. A rule of thumb is to multiply the part\
\ size by 10 to get the memory requirement. Modify this with care."
title: "Stream Part Size"
purge_staging_data:
title: "Purge Staging Files and Tables"
type: "boolean"
description: "Whether to delete the staging files from S3 after completing\
\ the sync. See the docs for details. Only relevant for COPY. Defaults\
\ to true."
default: true
supportsIncremental: true
supportsNormalization: true
supportsDBT: true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.jdbc.copy.s3;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;

/**
* S3 copy destinations need an S3DestinationConfig to configure the basic upload behavior. We also
* want additional flags to configure behavior that only applies to the copy-to-S3 +
* load-into-warehouse portion. Currently this is just purgeStagingData, but this may expand.
*/
public record S3CopyConfig(boolean purgeStagingData, S3DestinationConfig s3Config) {

public static boolean shouldPurgeStagingData(final JsonNode config) {
if (config.get("purge_staging_data") == null) {
return true;
} else {
return config.get("purge_staging_data").asBoolean();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class S3StreamCopier implements StreamCopier {
private final Timestamp uploadTime;
protected final String stagingFolder;
protected final Map<String, S3Writer> stagingWritersByFile = new HashMap<>();
private final boolean purgeStagingData;

// The number of batches of records that will be inserted into each file.
private final int maxPartsPerFile;
Expand All @@ -64,7 +65,7 @@ public S3StreamCopier(final String stagingFolder,
final String schema,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final S3CopyConfig config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations,
final ConfiguredAirbyteStream configuredAirbyteStream,
Expand All @@ -81,7 +82,8 @@ public S3StreamCopier(final String stagingFolder,
this.uploadTime = uploadTime;
this.tmpTableName = nameTransformer.getTmpTableName(this.streamName);
this.s3Client = client;
this.s3Config = s3Config;
this.s3Config = config.s3Config();
this.purgeStagingData = config.purgeStagingData();

this.maxPartsPerFile = maxPartsPerFile;
this.partsAddedToCurrentFile = 0;
Expand Down Expand Up @@ -176,15 +178,17 @@ public String generateMergeStatement(final String destTableName) {

@Override
public void removeFileAndDropTmpTable() throws Exception {
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
final String suffix = entry.getKey();
final String objectKey = entry.getValue().getOutputPath();

LOGGER.info("Begin cleaning s3 staging file {}.", objectKey);
if (s3Client.doesObjectExist(s3Config.getBucketName(), objectKey)) {
s3Client.deleteObject(s3Config.getBucketName(), objectKey);
if (purgeStagingData) {
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
final String suffix = entry.getKey();
final String objectKey = entry.getValue().getOutputPath();

LOGGER.info("Begin cleaning s3 staging file {}.", objectKey);
if (s3Client.doesObjectExist(s3Config.getBucketName(), objectKey)) {
s3Client.deleteObject(s3Config.getBucketName(), objectKey);
}
LOGGER.info("S3 staging file {} cleaned.", suffix);
}
LOGGER.info("S3 staging file {} cleaned.", suffix);
}

LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,17 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;

public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3CopyConfig> {

/**
* Used by the copy consumer.
*/
@Override
public StreamCopier create(final String configuredSchema,
final S3DestinationConfig s3Config,
final S3CopyConfig config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
Expand All @@ -30,9 +29,9 @@ public StreamCopier create(final String configuredSchema,
try {
final AirbyteStream stream = configuredStream.getStream();
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
final AmazonS3 s3Client = s3Config.getS3Client();
final AmazonS3 s3Client = config.s3Config().getS3Client();

return create(stagingFolder, schema, s3Client, db, s3Config, nameTransformer, sqlOperations, configuredStream);
return create(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream);
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -45,7 +44,7 @@ protected abstract StreamCopier create(String stagingFolder,
String schema,
AmazonS3 s3Client,
JdbcDatabase db,
S3DestinationConfig s3Config,
S3CopyConfig config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations,
ConfiguredAirbyteStream configuredStream)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.jdbc.copy.s3;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.junit.jupiter.api.Test;

public class S3CopyConfigTest {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Test
public void setsDefaultValues() throws IOException {
final boolean purgeStagingData = S3CopyConfig.shouldPurgeStagingData(OBJECT_MAPPER.readTree("{}"));

assertTrue(purgeStagingData);
}

@Test
public void parsesPurgeStagingDataCorrectly() throws IOException {
final boolean purgeStagingData = S3CopyConfig.shouldPurgeStagingData(OBJECT_MAPPER.readTree(
"""
{
"purge_staging_data": false
}
"""));

assertFalse(purgeStagingData);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import com.amazonaws.services.s3.AmazonS3Client;
Expand Down Expand Up @@ -130,7 +131,7 @@ public void setup() {
"fake-schema",
s3Client,
db,
S3_CONFIG,
new S3CopyConfig(true, S3_CONFIG),
new ExtendedNameTransformer(),
sqlOperations,
CONFIGURED_STREAM,
Expand Down Expand Up @@ -220,6 +221,41 @@ public void deletesStagingFiles() throws Exception {
verify(s3Client).deleteObject("fake-bucket", "fakeOutputPath-00000");
}

@Test
public void doesNotDeleteStagingFiles_if_purgeStagingDataDisabled() throws Exception {
copier = new S3StreamCopier(
"fake-staging-folder",
"fake-schema",
s3Client,
db,
// Explicitly disable purgeStagingData
new S3CopyConfig(false, S3_CONFIG),
new ExtendedNameTransformer(),
sqlOperations,
CONFIGURED_STREAM,
UPLOAD_TIME,
MAX_PARTS_PER_FILE) {

@Override
public void copyS3CsvFileIntoTable(
final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3DestinationConfig s3Config) {
copyArguments.add(new CopyArguments(database, s3FileLocation, schema, tableName, s3Config));
}

};

copier.prepareStagingFile();
doReturn(true).when(s3Client).doesObjectExist("fake-bucket", "fakeOutputPath-00000");

copier.removeFileAndDropTmpTable();

verify(s3Client, never()).deleteObject("fake-bucket", "fakeOutputPath-00000");
}

@Test
public void copiesCorrectFilesToTable() throws Exception {
// Generate two files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.22
LABEL io.airbyte.version=0.3.23
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.s3.S3Destination;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
Expand Down Expand Up @@ -42,7 +43,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
getDatabase(config),
getSqlOperations(),
getNameTransformer(),
getS3DestinationConfig(config),
new S3CopyConfig(S3CopyConfig.shouldPurgeStagingData(config), getS3DestinationConfig(config)),
catalog,
new RedshiftStreamCopierFactory(),
getConfiguredSchema(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
import io.airbyte.integrations.destination.redshift.manifest.Entry;
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
Expand Down Expand Up @@ -39,14 +40,17 @@ public RedshiftStreamCopier(final String stagingFolder,
final String schema,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final S3CopyConfig config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations,
final ConfiguredAirbyteStream configuredAirbyteStream) {
this(
stagingFolder,
schema, client, db,
s3Config, nameTransformer,
schema,
client,
db,
config,
nameTransformer,
sqlOperations,
Timestamp.from(Instant.now()),
configuredAirbyteStream);
Expand All @@ -57,12 +61,21 @@ public RedshiftStreamCopier(final String stagingFolder,
final String schema,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final S3CopyConfig config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations,
final Timestamp uploadTime,
final ConfiguredAirbyteStream configuredAirbyteStream) {
super(stagingFolder, schema, client, db, s3Config, nameTransformer, sqlOperations, configuredAirbyteStream, uploadTime, MAX_PARTS_PER_FILE);
super(stagingFolder,
schema,
client,
db,
config,
nameTransformer,
sqlOperations,
configuredAirbyteStream,
uploadTime,
MAX_PARTS_PER_FILE);
objectMapper = new ObjectMapper();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;

/**
Expand All @@ -23,11 +23,11 @@ public StreamCopier create(final String stagingFolder,
final String schema,
final AmazonS3 s3Client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final S3CopyConfig config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations,
final ConfiguredAirbyteStream configuredStream) {
return new RedshiftStreamCopier(stagingFolder, schema, s3Client, db, s3Config, nameTransformer, sqlOperations, configuredStream);
return new RedshiftStreamCopier(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@
"examples": ["10"],
"description": "Optional. Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.",
"title": "Stream Part Size"
},
"purge_staging_data": {
"title": "Purge Staging Files and Tables",
"type": "boolean",
"description": "Whether to delete the staging files from S3 after completing the sync. See the docs for details. Only relevant for COPY. Defaults to true.",
"default": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand Down Expand Up @@ -70,15 +71,17 @@ public void setup() {
"fake-schema",
s3Client,
db,
new S3DestinationConfig(
"fake-endpoint",
"fake-bucket",
"fake-bucketPath",
"fake-region",
"fake-access-key-id",
"fake-secret-access-key",
PART_SIZE,
null),
new S3CopyConfig(
true,
new S3DestinationConfig(
"fake-endpoint",
"fake-bucket",
"fake-bucketPath",
"fake-region",
"fake-access-key-id",
"fake-secret-access-key",
PART_SIZE,
null)),
new ExtendedNameTransformer(),
sqlOperations,
UPLOAD_TIME,
Expand Down
Loading