Skip to content

Commit 391668d

Browse files
edgaoschlattk
authored andcommitted
🎉 Destination Redshift: add option to enable/disable deleting staging data (airbytehq#8855)
1 parent 3ab3cd2 commit 391668d

File tree

14 files changed

+172
-38
lines changed

14 files changed

+172
-38
lines changed

‎airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
- name: Redshift
151151
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
152152
dockerRepository: airbyte/destination-redshift
153-
dockerImageTag: 0.3.22
153+
dockerImageTag: 0.3.23
154154
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
155155
icon: redshift.svg
156156
- name: Rockset

‎airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+8-1
Original file line numberDiff line numberDiff line change
@@ -2918,7 +2918,7 @@
29182918
supported_destination_sync_modes:
29192919
- "overwrite"
29202920
- "append"
2921-
- dockerImage: "airbyte/destination-redshift:0.3.22"
2921+
- dockerImage: "airbyte/destination-redshift:0.3.23"
29222922
spec:
29232923
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
29242924
connectionSpecification:
@@ -3043,6 +3043,13 @@
30433043
\ in larger memory requirements. A rule of thumb is to multiply the part\
30443044
\ size by 10 to get the memory requirement. Modify this with care."
30453045
title: "Stream Part Size"
3046+
purge_staging_data:
3047+
title: "Purge Staging Files and Tables"
3048+
type: "boolean"
3049+
description: "Whether to delete the staging files from S3 after completing\
3050+
\ the sync. See the docs for details. Only relevant for COPY. Defaults\
3051+
\ to true."
3052+
default: true
30463053
supportsIncremental: true
30473054
supportsNormalization: true
30483055
supportsDBT: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.jdbc.copy.s3;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
9+
10+
/**
11+
* S3 copy destinations need an S3DestinationConfig to configure the basic upload behavior. We also
12+
* want additional flags to configure behavior that only applies to the copy-to-S3 +
13+
* load-into-warehouse portion. Currently this is just purgeStagingData, but this may expand.
14+
*/
15+
public record S3CopyConfig(boolean purgeStagingData, S3DestinationConfig s3Config) {
16+
17+
public static boolean shouldPurgeStagingData(final JsonNode config) {
18+
if (config.get("purge_staging_data") == null) {
19+
return true;
20+
} else {
21+
return config.get("purge_staging_data").asBoolean();
22+
}
23+
}
24+
25+
}

‎airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java

+14-10
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public abstract class S3StreamCopier implements StreamCopier {
4747
private final Timestamp uploadTime;
4848
protected final String stagingFolder;
4949
protected final Map<String, S3Writer> stagingWritersByFile = new HashMap<>();
50+
private final boolean purgeStagingData;
5051

5152
// The number of batches of records that will be inserted into each file.
5253
private final int maxPartsPerFile;
@@ -64,7 +65,7 @@ public S3StreamCopier(final String stagingFolder,
6465
final String schema,
6566
final AmazonS3 client,
6667
final JdbcDatabase db,
67-
final S3DestinationConfig s3Config,
68+
final S3CopyConfig config,
6869
final ExtendedNameTransformer nameTransformer,
6970
final SqlOperations sqlOperations,
7071
final ConfiguredAirbyteStream configuredAirbyteStream,
@@ -81,7 +82,8 @@ public S3StreamCopier(final String stagingFolder,
8182
this.uploadTime = uploadTime;
8283
this.tmpTableName = nameTransformer.getTmpTableName(this.streamName);
8384
this.s3Client = client;
84-
this.s3Config = s3Config;
85+
this.s3Config = config.s3Config();
86+
this.purgeStagingData = config.purgeStagingData();
8587

8688
this.maxPartsPerFile = maxPartsPerFile;
8789
this.partsAddedToCurrentFile = 0;
@@ -176,15 +178,17 @@ public String generateMergeStatement(final String destTableName) {
176178

177179
@Override
178180
public void removeFileAndDropTmpTable() throws Exception {
179-
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
180-
final String suffix = entry.getKey();
181-
final String objectKey = entry.getValue().getOutputPath();
182-
183-
LOGGER.info("Begin cleaning s3 staging file {}.", objectKey);
184-
if (s3Client.doesObjectExist(s3Config.getBucketName(), objectKey)) {
185-
s3Client.deleteObject(s3Config.getBucketName(), objectKey);
181+
if (purgeStagingData) {
182+
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
183+
final String suffix = entry.getKey();
184+
final String objectKey = entry.getValue().getOutputPath();
185+
186+
LOGGER.info("Begin cleaning s3 staging file {}.", objectKey);
187+
if (s3Client.doesObjectExist(s3Config.getBucketName(), objectKey)) {
188+
s3Client.deleteObject(s3Config.getBucketName(), objectKey);
189+
}
190+
LOGGER.info("S3 staging file {} cleaned.", suffix);
186191
}
187-
LOGGER.info("S3 staging file {} cleaned.", suffix);
188192
}
189193

190194
LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName);

‎airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,17 @@
1010
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1111
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
1212
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
13-
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1413
import io.airbyte.protocol.models.AirbyteStream;
1514
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1615

17-
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {
16+
public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3CopyConfig> {
1817

1918
/**
2019
* Used by the copy consumer.
2120
*/
2221
@Override
2322
public StreamCopier create(final String configuredSchema,
24-
final S3DestinationConfig s3Config,
23+
final S3CopyConfig config,
2524
final String stagingFolder,
2625
final ConfiguredAirbyteStream configuredStream,
2726
final ExtendedNameTransformer nameTransformer,
@@ -30,9 +29,9 @@ public StreamCopier create(final String configuredSchema,
3029
try {
3130
final AirbyteStream stream = configuredStream.getStream();
3231
final String schema = StreamCopierFactory.getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
33-
final AmazonS3 s3Client = s3Config.getS3Client();
32+
final AmazonS3 s3Client = config.s3Config().getS3Client();
3433

35-
return create(stagingFolder, schema, s3Client, db, s3Config, nameTransformer, sqlOperations, configuredStream);
34+
return create(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream);
3635
} catch (final Exception e) {
3736
throw new RuntimeException(e);
3837
}
@@ -45,7 +44,7 @@ protected abstract StreamCopier create(String stagingFolder,
4544
String schema,
4645
AmazonS3 s3Client,
4746
JdbcDatabase db,
48-
S3DestinationConfig s3Config,
47+
S3CopyConfig config,
4948
ExtendedNameTransformer nameTransformer,
5049
SqlOperations sqlOperations,
5150
ConfiguredAirbyteStream configuredStream)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.jdbc.copy.s3;
6+
7+
import static org.junit.jupiter.api.Assertions.assertFalse;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
import com.fasterxml.jackson.databind.ObjectMapper;
11+
import java.io.IOException;
12+
import org.junit.jupiter.api.Test;
13+
14+
public class S3CopyConfigTest {
15+
16+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
17+
18+
@Test
19+
public void setsDefaultValues() throws IOException {
20+
final boolean purgeStagingData = S3CopyConfig.shouldPurgeStagingData(OBJECT_MAPPER.readTree("{}"));
21+
22+
assertTrue(purgeStagingData);
23+
}
24+
25+
@Test
26+
public void parsesPurgeStagingDataCorrectly() throws IOException {
27+
final boolean purgeStagingData = S3CopyConfig.shouldPurgeStagingData(OBJECT_MAPPER.readTree(
28+
"""
29+
{
30+
"purge_staging_data": false
31+
}
32+
"""));
33+
34+
assertFalse(purgeStagingData);
35+
}
36+
37+
}

‎airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierTest.java

+37-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.mockito.Mockito.doReturn;
1111
import static org.mockito.Mockito.mock;
1212
import static org.mockito.Mockito.mockConstruction;
13+
import static org.mockito.Mockito.never;
1314
import static org.mockito.Mockito.verify;
1415

1516
import com.amazonaws.services.s3.AmazonS3Client;
@@ -130,7 +131,7 @@ public void setup() {
130131
"fake-schema",
131132
s3Client,
132133
db,
133-
S3_CONFIG,
134+
new S3CopyConfig(true, S3_CONFIG),
134135
new ExtendedNameTransformer(),
135136
sqlOperations,
136137
CONFIGURED_STREAM,
@@ -220,6 +221,41 @@ public void deletesStagingFiles() throws Exception {
220221
verify(s3Client).deleteObject("fake-bucket", "fakeOutputPath-00000");
221222
}
222223

224+
@Test
225+
public void doesNotDeleteStagingFiles_if_purgeStagingDataDisabled() throws Exception {
226+
copier = new S3StreamCopier(
227+
"fake-staging-folder",
228+
"fake-schema",
229+
s3Client,
230+
db,
231+
// Explicitly disable purgeStagingData
232+
new S3CopyConfig(false, S3_CONFIG),
233+
new ExtendedNameTransformer(),
234+
sqlOperations,
235+
CONFIGURED_STREAM,
236+
UPLOAD_TIME,
237+
MAX_PARTS_PER_FILE) {
238+
239+
@Override
240+
public void copyS3CsvFileIntoTable(
241+
final JdbcDatabase database,
242+
final String s3FileLocation,
243+
final String schema,
244+
final String tableName,
245+
final S3DestinationConfig s3Config) {
246+
copyArguments.add(new CopyArguments(database, s3FileLocation, schema, tableName, s3Config));
247+
}
248+
249+
};
250+
251+
copier.prepareStagingFile();
252+
doReturn(true).when(s3Client).doesObjectExist("fake-bucket", "fakeOutputPath-00000");
253+
254+
copier.removeFileAndDropTmpTable();
255+
256+
verify(s3Client, never()).deleteObject("fake-bucket", "fakeOutputPath-00000");
257+
}
258+
223259
@Test
224260
public void copiesCorrectFilesToTable() throws Exception {
225261
// Generate two files

‎airbyte-integrations/connectors/destination-redshift/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ ENV APPLICATION destination-redshift
66

77
ADD build/distributions/${APPLICATION}*.tar /airbyte
88

9-
LABEL io.airbyte.version=0.3.22
9+
LABEL io.airbyte.version=0.3.23
1010
LABEL io.airbyte.name=airbyte/destination-redshift

‎airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftCopyS3Destination.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1414
import io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory;
1515
import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
16+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
1617
import io.airbyte.integrations.destination.s3.S3Destination;
1718
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1819
import io.airbyte.protocol.models.AirbyteMessage;
@@ -42,7 +43,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
4243
getDatabase(config),
4344
getSqlOperations(),
4445
getNameTransformer(),
45-
getS3DestinationConfig(config),
46+
new S3CopyConfig(S3CopyConfig.shouldPurgeStagingData(config), getS3DestinationConfig(config)),
4647
catalog,
4748
new RedshiftStreamCopierFactory(),
4849
getConfiguredSchema(config));

‎airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.airbyte.db.jdbc.JdbcDatabase;
1212
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1313
import io.airbyte.integrations.destination.jdbc.SqlOperations;
14+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
1415
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier;
1516
import io.airbyte.integrations.destination.redshift.manifest.Entry;
1617
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
@@ -39,14 +40,17 @@ public RedshiftStreamCopier(final String stagingFolder,
3940
final String schema,
4041
final AmazonS3 client,
4142
final JdbcDatabase db,
42-
final S3DestinationConfig s3Config,
43+
final S3CopyConfig config,
4344
final ExtendedNameTransformer nameTransformer,
4445
final SqlOperations sqlOperations,
4546
final ConfiguredAirbyteStream configuredAirbyteStream) {
4647
this(
4748
stagingFolder,
48-
schema, client, db,
49-
s3Config, nameTransformer,
49+
schema,
50+
client,
51+
db,
52+
config,
53+
nameTransformer,
5054
sqlOperations,
5155
Timestamp.from(Instant.now()),
5256
configuredAirbyteStream);
@@ -57,12 +61,21 @@ public RedshiftStreamCopier(final String stagingFolder,
5761
final String schema,
5862
final AmazonS3 client,
5963
final JdbcDatabase db,
60-
final S3DestinationConfig s3Config,
64+
final S3CopyConfig config,
6165
final ExtendedNameTransformer nameTransformer,
6266
final SqlOperations sqlOperations,
6367
final Timestamp uploadTime,
6468
final ConfiguredAirbyteStream configuredAirbyteStream) {
65-
super(stagingFolder, schema, client, db, s3Config, nameTransformer, sqlOperations, configuredAirbyteStream, uploadTime, MAX_PARTS_PER_FILE);
69+
super(stagingFolder,
70+
schema,
71+
client,
72+
db,
73+
config,
74+
nameTransformer,
75+
sqlOperations,
76+
configuredAirbyteStream,
77+
uploadTime,
78+
MAX_PARTS_PER_FILE);
6679
objectMapper = new ObjectMapper();
6780
}
6881

‎airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1010
import io.airbyte.integrations.destination.jdbc.SqlOperations;
1111
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
12+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
1213
import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopierFactory;
13-
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1414
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1515

1616
/**
@@ -23,11 +23,11 @@ public StreamCopier create(final String stagingFolder,
2323
final String schema,
2424
final AmazonS3 s3Client,
2525
final JdbcDatabase db,
26-
final S3DestinationConfig s3Config,
26+
final S3CopyConfig config,
2727
final ExtendedNameTransformer nameTransformer,
2828
final SqlOperations sqlOperations,
2929
final ConfiguredAirbyteStream configuredStream) {
30-
return new RedshiftStreamCopier(stagingFolder, schema, s3Client, db, s3Config, nameTransformer, sqlOperations, configuredStream);
30+
return new RedshiftStreamCopier(stagingFolder, schema, s3Client, db, config, nameTransformer, sqlOperations, configuredStream);
3131
}
3232

3333
}

‎airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json

+6
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,12 @@
111111
"examples": ["10"],
112112
"description": "Optional. Increase this if syncing tables larger than 100GB. Only relevant for COPY. Files are streamed to S3 in parts. This determines the size of each part, in MBs. As S3 has a limit of 10,000 parts per file, part size affects the table size. This is 10MB by default, resulting in a default limit of 100GB tables. Note, a larger part size will result in larger memory requirements. A rule of thumb is to multiply the part size by 10 to get the memory requirement. Modify this with care.",
113113
"title": "Stream Part Size"
114+
},
115+
"purge_staging_data": {
116+
"title": "Purge Staging Files and Tables",
117+
"type": "boolean",
118+
"description": "Whether to delete the staging files from S3 after completing the sync. See the docs for details. Only relevant for COPY. Defaults to true.",
119+
"default": true
114120
}
115121
}
116122
}

‎airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierTest.java

+12-9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.airbyte.db.jdbc.JdbcDatabase;
2020
import io.airbyte.integrations.destination.ExtendedNameTransformer;
2121
import io.airbyte.integrations.destination.jdbc.SqlOperations;
22+
import io.airbyte.integrations.destination.jdbc.copy.s3.S3CopyConfig;
2223
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
2324
import io.airbyte.protocol.models.AirbyteStream;
2425
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
@@ -70,15 +71,17 @@ public void setup() {
7071
"fake-schema",
7172
s3Client,
7273
db,
73-
new S3DestinationConfig(
74-
"fake-endpoint",
75-
"fake-bucket",
76-
"fake-bucketPath",
77-
"fake-region",
78-
"fake-access-key-id",
79-
"fake-secret-access-key",
80-
PART_SIZE,
81-
null),
74+
new S3CopyConfig(
75+
true,
76+
new S3DestinationConfig(
77+
"fake-endpoint",
78+
"fake-bucket",
79+
"fake-bucketPath",
80+
"fake-region",
81+
"fake-access-key-id",
82+
"fake-secret-access-key",
83+
PART_SIZE,
84+
null)),
8285
new ExtendedNameTransformer(),
8386
sqlOperations,
8487
UPLOAD_TIME,

0 commit comments

Comments
 (0)