Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination Redshift (copy): accept bucket path for staging data #8607

Merged
merged 52 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
4f8eb4f
rename to Legacy
edgao Dec 7, 2021
0c4159d
add legacy test
edgao Dec 7, 2021
a62ead7
create S3StreamCopier
edgao Dec 7, 2021
daa9352
fix param deletion
edgao Dec 7, 2021
48d25d6
wip
edgao Dec 8, 2021
babcc77
wip
edgao Dec 8, 2021
098ce39
make tests work. mocks are awful.
edgao Dec 9, 2021
b949984
WIP replace old code; nothing works yet
edgao Dec 9, 2021
533880e
add getObjectKey; add S3CsvWriterTest
edgao Dec 9, 2021
36bdea8
write to multiple files correctly
edgao Dec 9, 2021
b03896a
correct deleteStagingFiles test
edgao Dec 9, 2021
dc8a2c4
completed things
edgao Dec 9, 2021
94029e4
fix test
edgao Dec 9, 2021
3019ef0
unit capitalization
edgao Dec 11, 2021
50a3a29
formatting
edgao Dec 11, 2021
7dfd452
wip
edgao Dec 11, 2021
15b9f27
remove mistaken dep
edgao Dec 13, 2021
c54baf3
use UUID suffix
edgao Dec 13, 2021
5c367df
various improvements
edgao Dec 13, 2021
2b27a4e
optional header; unit test file contents
edgao Dec 14, 2021
2bcf9a5
fix field name
edgao Dec 14, 2021
24c6a19
remove comment
edgao Dec 14, 2021
d860fd6
RECORD CLASS RECORD CLASS
edgao Dec 14, 2021
ff400c7
warning
edgao Dec 14, 2021
0fe979b
text block
edgao Dec 14, 2021
685f366
add more csv options
edgao Dec 14, 2021
d327bcc
update comment
edgao Dec 14, 2021
0f7b2bd
assert copy operation
edgao Dec 14, 2021
866db40
add test
edgao Dec 14, 2021
1b4f41f
cutover to non-legacy stream copier
edgao Dec 14, 2021
89f53fe
update param name
edgao Dec 14, 2021
5cd8b56
minor comments about sheet generator + Flattening
edgao Dec 14, 2021
7a1de37
timezones :(
edgao Dec 15, 2021
d4fcff7
add dup code comment
edgao Dec 15, 2021
9d3c6c1
delete redundant tests
edgao Dec 15, 2021
e975bb8
manifest also exists within bucketPath
edgao Dec 15, 2021
0ef601e
add comment
edgao Dec 15, 2021
734141d
better comment
edgao Dec 15, 2021
8cd2e51
rename getObjectKey + add javadoc
edgao Dec 15, 2021
842e1d9
explain default behavior
edgao Dec 15, 2021
485fe5d
remove from abstract classes
edgao Dec 15, 2021
1bdfab4
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 15, 2021
ed07183
reformat
edgao Dec 16, 2021
5d0427a
add implementation for getObjectPath
edgao Dec 16, 2021
fed6c36
prepare for publish
edgao Dec 16, 2021
f58bb90
follow doc conventions
edgao Dec 16, 2021
87b8543
follow doc conventions
edgao Dec 16, 2021
028c3b6
rename to getOutputPath
edgao Dec 16, 2021
192d6a3
add comment
edgao Dec 16, 2021
257dcb7
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 16, 2021
372098d
Merge branch 'master' into edgao/s3_based_stream_copier
edgao Dec 17, 2021
7625bee
regenerate seed specs
edgao Dec 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.jdbc.copy.s3.LegacyS3StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetWriter;
Expand All @@ -24,7 +25,7 @@

/**
* This implementation is similar to
* {@link io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier}. The difference is that
* {@link LegacyS3StreamCopier}. The difference is that
* this implementation creates Parquet staging files, instead of CSV ones.
* <p>
* </p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ protected void closeWhenFail() throws IOException {
uploadManager.abort();
}

@Override
public String getObjectPath() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,9 @@ public CSVPrinter getCsvPrinter() {
return csvPrinter;
}

@Override
public String getObjectPath() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ protected void closeWhenFail() {
uploadManager.abort();
}

@Override
public String getObjectPath() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ public void close(final boolean hasFailed) throws IOException {
}
}

@Override
public String getObjectPath() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ public void initialize() {
}

/**
* {@link AmazonS3#doesBucketExistV2} should be used to check the bucket existence. However, this
* method does not work for GCS. So we use {@link AmazonS3#headBucket} instead, which will throw an
* exception if the bucket does not exist, or there is no permission to access it.
* {@link AmazonS3#doesBucketExistV2} should be used to check the bucket existence. However, this method does not work for GCS. So we use {@link
* AmazonS3#headBucket} instead, which will throw an exception if the bucket does not exist, or there is no permission to access it.
*/
public boolean gcsBucketExist(final AmazonS3 s3Client, final String bucket) {
try {
Expand Down Expand Up @@ -141,4 +140,9 @@ public static String getOutputFilename(final Timestamp timestamp, final S3Format
format.getFileExtension());
}

@Override
public String getObjectPath() {
// TODO
throw new UnsupportedOperationException("not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation "org.testcontainers:postgresql:1.15.3"
testImplementation "org.mockito:mockito-inline:4.1.0"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation "org.testcontainers:postgresql:1.15.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class CopyConsumerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);

private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 mib
private static final int MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 1024 / 4; // 256 MiB

public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import java.util.UUID;

/**
* StreamCopier is responsible for writing to a staging persistence and providing methods to remove
* the staged data.
* StreamCopier is responsible for writing to a staging persistence and providing methods to remove the staged data.
*/
public interface StreamCopier {

Expand All @@ -19,8 +18,8 @@ public interface StreamCopier {
void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception;

/**
* Closes the writer for the stream to the staging persistence. This method should block until all
* buffered data has been written to the persistence.
* Closes the writer for the stream to the staging persistence. This method should block until all buffered data has been written to the
* persistence.
*/
void closeStagingUploader(boolean hasFailed) throws Exception;

Expand All @@ -30,8 +29,7 @@ public interface StreamCopier {
void createTemporaryTable() throws Exception;

/**
* Copies the staging file to the temporary table. This method should block until the copy/upload
* has completed.
* Copies the staging file to the temporary table. This method should block until the copy/upload has completed.
*/
void copyStagingFileToTemporaryTable() throws Exception;

Expand All @@ -53,15 +51,14 @@ public interface StreamCopier {
String generateMergeStatement(String destTableName) throws Exception;

/**
* Cleans up the copier by removing the staging file and dropping the temporary table after
* completion or failure.
* Cleans up the copier by removing the staging file and dropping the temporary table after completion or failure.
*/
void removeFileAndDropTmpTable() throws Exception;

/**
* Creates the staging file and all the necessary items to write data to this file.
*
* @return the name of the staging file
* @return A string that unqiuely identifies the file. E.g. the filename, or a unique suffix that is appended to a shared filename prefix
*/
String prepareStagingFile();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.jdbc.copy.s3;

import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.StagingFilenameGenerator;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* @deprecated See {@link S3StreamCopier}
*/
@Deprecated
public abstract class LegacyS3StreamCopier implements StreamCopier {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is basically identical to S3StreamCopier on master; I just removed an unused parameter.

Full diff:
Screen Shot 2021-12-14 at 2 24 49 PM


private static final Logger LOGGER = LoggerFactory.getLogger(LegacyS3StreamCopier.class);

private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default.
private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS;
// It is optimal to write every 10,000,000 records (BATCH_SIZE * DEFAULT_PART) to a new file.
// The BATCH_SIZE is defined in CopyConsumerFactory.
// The average size of such a file will be about 1 GB.
// This will make it easier to work with files and speed up the recording of large amounts of data.
// In addition, for a large number of records, we will not get a drop in the copy request to
// QUERY_TIMEOUT when
// the records from the file are copied to the staging table.
public static final int MAX_PARTS_PER_FILE = 1000;

protected final AmazonS3 s3Client;
protected final S3DestinationConfig s3Config;
protected final String tmpTableName;
private final DestinationSyncMode destSyncMode;
protected final String schemaName;
protected final String streamName;
protected final JdbcDatabase db;
private final ExtendedNameTransformer nameTransformer;
private final SqlOperations sqlOperations;
protected final Set<String> s3StagingFiles = new HashSet<>();
private final Map<String, StreamTransferManager> multipartUploadManagers = new HashMap<>();
private final Map<String, MultiPartOutputStream> outputStreams = new HashMap<>();
private final Map<String, CSVPrinter> csvPrinters = new HashMap<>();
protected final String stagingFolder;
private final StagingFilenameGenerator filenameGenerator;

public LegacyS3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
this.stagingFolder = stagingFolder;
this.db = db;
this.nameTransformer = nameTransformer;
this.sqlOperations = sqlOperations;
this.tmpTableName = nameTransformer.getTmpTableName(streamName);
this.s3Client = client;
this.s3Config = s3Config;
this.filenameGenerator = new StagingFilenameGenerator(streamName, MAX_PARTS_PER_FILE);
}

private String prepareS3StagingFile() {
return String.join("/", stagingFolder, schemaName, filenameGenerator.getStagingFilename());
}

@Override
public String prepareStagingFile() {
final var name = prepareS3StagingFile();
if (!s3StagingFiles.contains(name)) {
s3StagingFiles.add(name);
LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize());
// The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not
// have support for streaming multipart uploads;
// The alternative is first writing the entire output to disk before loading into S3. This is not
// feasible with large tables.
// Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's
// configured part size.
// Memory consumption is (numUploadThreads + queue capacity) * part size = (10 + 10) * 10 = 200 MB at current configurations.
final var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
.partSize(s3Config.getPartSize());
multipartUploadManagers.put(name, manager);
final var outputStream = manager.getMultiPartOutputStreams().get(0);
// We only need one output stream as we only have one input stream. This is reasonably performant.
// See the above comment.
outputStreams.put(name, outputStream);
final var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8);
try {
csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT));
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
return name;
}

@Override
public void write(final UUID id, final AirbyteRecordMessage recordMessage, final String s3FileName) throws Exception {
if (csvPrinters.containsKey(s3FileName)) {
csvPrinters.get(s3FileName).printRecord(id,
Jsons.serialize(recordMessage.getData()),
Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())));
}
}

@Override
public void closeStagingUploader(final boolean hasFailed) throws Exception {
if (hasFailed) {
for (final var multipartUploadManager : multipartUploadManagers.values()) {
multipartUploadManager.abort();
}
}
closeAndWaitForUpload();
}

@Override
public void createDestinationSchema() throws Exception {
LOGGER.info("Creating schema in destination if it doesn't exist: {}", schemaName);
sqlOperations.createSchemaIfNotExists(db, schemaName);
}

@Override
public void createTemporaryTable() throws Exception {
LOGGER.info("Preparing tmp table in destination for stream: {}, schema: {}, tmp table name: {}.", streamName, schemaName, tmpTableName);
sqlOperations.createTableIfNotExists(db, schemaName, tmpTableName);
}

@Override
public void copyStagingFileToTemporaryTable() throws Exception {
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName);
s3StagingFiles.forEach(s3StagingFile -> Exceptions.toRuntime(() -> {
copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), s3StagingFile), schemaName, tmpTableName, s3Config);
}));
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}

@Override
public String createDestinationTable() throws Exception {
final var destTableName = nameTransformer.getRawTableName(streamName);
LOGGER.info("Preparing table {} in destination.", destTableName);
sqlOperations.createTableIfNotExists(db, schemaName, destTableName);
LOGGER.info("Table {} in destination prepared.", tmpTableName);

return destTableName;
}

@Override
public String generateMergeStatement(final String destTableName) {
LOGGER.info("Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.", tmpTableName, destTableName, schemaName);
final var queries = new StringBuilder();
if (destSyncMode.equals(DestinationSyncMode.OVERWRITE)) {
queries.append(sqlOperations.truncateTableQuery(db, schemaName, destTableName));
LOGGER.info("Destination OVERWRITE mode detected. Dest table: {}, schema: {}, truncated.", destTableName, schemaName);
}
queries.append(sqlOperations.copyTableQuery(db, schemaName, tmpTableName, destTableName));
return queries.toString();
}

@Override
public void removeFileAndDropTmpTable() throws Exception {
s3StagingFiles.forEach(s3StagingFile -> {
LOGGER.info("Begin cleaning s3 staging file {}.", s3StagingFile);
if (s3Client.doesObjectExist(s3Config.getBucketName(), s3StagingFile)) {
s3Client.deleteObject(s3Config.getBucketName(), s3StagingFile);
}
LOGGER.info("S3 staging file {} cleaned.", s3StagingFile);
});

LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName);
sqlOperations.dropTableIfExists(db, schemaName, tmpTableName);
LOGGER.info("{} tmp table in destination cleaned.", tmpTableName);
}

protected static String getFullS3Path(final String s3BucketName, final String s3StagingFile) {
return String.join("/", "s3:/", s3BucketName, s3StagingFile);
}

/**
* Closes the printers/outputstreams and waits for any buffered uploads to complete.
*/
private void closeAndWaitForUpload() throws IOException {
LOGGER.info("Uploading remaining data for {} stream.", streamName);
for (final var csvPrinter : csvPrinters.values()) {
csvPrinter.close();
}
for (final var outputStream : outputStreams.values()) {
outputStream.close();
}
for (final var multipartUploadManager : multipartUploadManagers.values()) {
multipartUploadManager.complete();
}
LOGGER.info("All data for {} stream uploaded.", streamName);
}

public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,
String s3FileLocation,
String schema,
String tableName,
S3DestinationConfig s3Config)
throws SQLException;

}
Loading