Skip to content

Commit 7a9e31c

Browse files
committed
wip
1 parent adf72a2 commit 7a9e31c

File tree

7 files changed

+82
-23
lines changed

7 files changed

+82
-23
lines changed

airbyte-integrations/connectors/destination-jdbc/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
implementation 'com.fasterxml.jackson.core:jackson-databind'
2020

2121
testImplementation "org.testcontainers:postgresql:1.15.3"
22+
testImplementation "org.mockito:mockito-inline:4.1.0"
2223

2324
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
2425
integrationTestJavaImplementation "org.testcontainers:postgresql:1.15.3"

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/LegacyS3StreamCopierFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
1616
import io.airbyte.protocol.models.DestinationSyncMode;
1717

18-
// TODO create new S3StreamCopierFactory
18+
/**
19+
* See {@link S3StreamCopierFactory} instead.
20+
*/
1921
@Deprecated
2022
public abstract class LegacyS3StreamCopierFactory implements StreamCopierFactory<S3DestinationConfig> {
2123

airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/copy/s3/LegacyS3StreamCopierTest.java

+48-19
Original file line numberDiff line numberDiff line change
@@ -6,46 +6,53 @@
66

77
import static org.junit.jupiter.api.Assertions.assertEquals;
88
import static org.junit.jupiter.api.Assertions.assertThrows;
9-
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.ArgumentMatchers.anyInt;
1010
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
11-
import static org.mockito.Mockito.clearInvocations;
1211
import static org.mockito.Mockito.doReturn;
1312
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.mockConstructionWithAnswer;
1414
import static org.mockito.Mockito.verify;
1515

16+
import alex.mojaki.s3upload.StreamTransferManager;
1617
import com.amazonaws.services.s3.AmazonS3Client;
1718
import io.airbyte.db.jdbc.JdbcDatabase;
1819
import io.airbyte.integrations.destination.ExtendedNameTransformer;
1920
import io.airbyte.integrations.destination.jdbc.SqlOperations;
2021
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
2122
import io.airbyte.protocol.models.AirbyteRecordMessage;
2223
import io.airbyte.protocol.models.DestinationSyncMode;
24+
import java.util.List;
2325
import java.util.UUID;
26+
import org.junit.jupiter.api.AfterEach;
2427
import org.junit.jupiter.api.BeforeEach;
2528
import org.junit.jupiter.api.Test;
2629
import org.junit.platform.commons.util.ExceptionUtils;
30+
import org.mockito.MockedConstruction;
2731

2832
/**
2933
* Tests to help define what the legacy S3 stream copier did.
3034
* <p>
31-
* Somewhat sketchily verifies what the AmazonS3Client does, even though the stream copier only actually interacts with it via StreamTransferManager
32-
* instances. The interactions are mostly obvious enough that this feels fine.
33-
* <p>
3435
* Does not verify SQL operations, as they're fairly transparent.
3536
*/
3637
public class LegacyS3StreamCopierTest {
3738

39+
public static final int PART_SIZE = 5;
40+
3841
private AmazonS3Client s3Client;
3942
private JdbcDatabase db;
4043
private SqlOperations sqlOperations;
4144
private LegacyS3StreamCopier copier;
4245

46+
private MockedConstruction<StreamTransferManager> streamTransferManagerMockedConstruction;
47+
4348
@BeforeEach
4449
public void setup() {
4550
s3Client = mock(AmazonS3Client.class, RETURNS_DEEP_STUBS);
4651
db = mock(JdbcDatabase.class);
4752
sqlOperations = mock(SqlOperations.class);
4853

54+
streamTransferManagerMockedConstruction = mockConstructionWithAnswer(StreamTransferManager.class, RETURNS_DEEP_STUBS);
55+
4956
copier = new LegacyS3StreamCopier(
5057
"fake-staging-folder",
5158
DestinationSyncMode.OVERWRITE,
@@ -60,6 +67,7 @@ public void setup() {
6067
"fake-region",
6168
"fake-access-key-id",
6269
"fake-secret-access-key",
70+
PART_SIZE,
6371
null
6472
),
6573
new ExtendedNameTransformer(),
@@ -74,45 +82,66 @@ public void copyS3CsvFileIntoTable(
7482
final S3DestinationConfig s3Config) {
7583
throw new UnsupportedOperationException("not implemented");
7684
}
77-
7885
};
7986
}
8087

88+
@AfterEach
89+
public void teardown() {
90+
streamTransferManagerMockedConstruction.close();
91+
}
92+
8193
@Test
8294
public void createSequentialStagingFiles_when_multipleFilesRequested() {
83-
// Each file will contain multiple parts, so the first MAX_PARTS_PER_FILE will all go into the same file
84-
for (var i = 0; i < LegacyS3StreamCopier.MAX_PARTS_PER_FILE; i++) {
85-
final String file1 = copier.prepareStagingFile();
86-
assertEquals("fake-staging-folder/fake-schema/fake-stream_00000", file1, "preparing file number " + i);
95+
// When we call prepareStagingFile() the first time, it should create exactly one upload manager
96+
final String firstFile = copier.prepareStagingFile();
97+
assertEquals("fake-staging-folder/fake-schema/fake-stream_00000", firstFile);
98+
final List<StreamTransferManager> firstManagers = streamTransferManagerMockedConstruction.constructed();
99+
final StreamTransferManager firstManager = firstManagers.get(0);
100+
verify(firstManager.numUploadThreads(anyInt()).queueCapacity(anyInt())).partSize(PART_SIZE);
101+
assertEquals(1, firstManagers.size(), "There were actually " + firstManagers.size() + " upload managers");
102+
103+
// Each file will contain multiple parts, so the first MAX_PARTS_PER_FILE will all go into the same file (i.e. we should not start more uploads)
104+
// We've already called prepareStagingFile() once, so only go to MAX_PARTS_PER_FILE - 1
105+
for (var i = 0; i < LegacyS3StreamCopier.MAX_PARTS_PER_FILE - 1; i++) {
106+
final String existingFile = copier.prepareStagingFile();
107+
assertEquals("fake-staging-folder/fake-schema/fake-stream_00000", existingFile, "preparing file number " + i);
108+
final int streamManagerCount = streamTransferManagerMockedConstruction.constructed().size();
109+
assertEquals(1, streamManagerCount, "There were actually " + streamManagerCount + " upload managers");
87110
}
88-
verify(s3Client).initiateMultipartUpload(any());
89-
clearInvocations(s3Client);
90111

91-
final String file2 = copier.prepareStagingFile();
92-
assertEquals("fake-staging-folder/fake-schema/fake-stream_00001", file2);
93-
verify(s3Client).initiateMultipartUpload(any());
112+
// Now that we've hit the MAX_PARTS_PER_FILE, we should start a new upload
113+
final String secondFile = copier.prepareStagingFile();
114+
assertEquals("fake-staging-folder/fake-schema/fake-stream_00001", secondFile);
115+
final List<StreamTransferManager> secondManagers = streamTransferManagerMockedConstruction.constructed();
116+
final StreamTransferManager secondManager = secondManagers.get(1);
117+
verify(secondManager.numUploadThreads(anyInt()).queueCapacity(anyInt())).partSize(PART_SIZE);
118+
assertEquals(2, secondManagers.size(), "There were actually " + secondManagers.size() + " upload managers");
94119
}
95120

96121
@Test
97122
public void closesS3Upload_when_stagingUploaderClosedSuccessfully() throws Exception {
98-
final String file = copier.prepareStagingFile();
99-
copier.write(UUID.randomUUID(), new AirbyteRecordMessage().withEmittedAt(84L), file);
123+
copier.prepareStagingFile();
100124

101125
copier.closeStagingUploader(false);
102126

103-
verify(s3Client).completeMultipartUpload(any());
127+
final List<StreamTransferManager> managers = streamTransferManagerMockedConstruction.constructed();
128+
final StreamTransferManager manager = managers.get(0);
129+
verify(manager).numUploadThreads(10);
130+
verify(manager).complete();
104131
}
105132

106133
@Test
107134
public void closesS3Upload_when_stagingUploaderClosedFailingly() throws Exception {
108135
final String file = copier.prepareStagingFile();
136+
// This is needed to trick the StreamTransferManager into thinking it has data that needs to be written.
109137
copier.write(UUID.randomUUID(), new AirbyteRecordMessage().withEmittedAt(84L), file);
110138

111139
// TODO why does this throw an interruptedexception
112140
final RuntimeException exception = assertThrows(RuntimeException.class, () -> copier.closeStagingUploader(true));
113141

114142
// the wrapping chain is RuntimeException -> ExecutionException -> RuntimeException -> InterruptedException
115-
assertEquals(InterruptedException.class, exception.getCause().getCause().getCause().getClass(), "Original exception: " + ExceptionUtils.readStackTrace(exception));
143+
assertEquals(InterruptedException.class, exception.getCause().getCause().getCause().getClass(),
144+
"Original exception: " + ExceptionUtils.readStackTrace(exception));
116145
}
117146

118147
@Test

airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json

+6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@
5454
"description": "The name of the staging S3 bucket to use if utilising a COPY strategy. COPY is recommended for production workloads for better speed and scalability. See <a href=\"https://docs.aws.amazon.com/redshift/latest/dg/c_loading-data-best-practices.html\">AWS docs</a> for more details.",
5555
"examples": ["airbyte.staging"]
5656
},
57+
"s3_bucket_path": {
58+
"title": "S3 Bucket Path",
59+
"type": "string",
60+
"description": "The directory under the S3 bucket where data will be written.",
61+
"examples": ["data_sync/test"]
62+
},
5763
"s3_bucket_region": {
5864
"title": "S3 Bucket Region",
5965
"type": "string",

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java

+13
Original file line numberDiff line numberDiff line change
@@ -135,4 +135,17 @@ public AmazonS3 getS3Client() {
135135
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
136136
.build();
137137
}
138+
139+
public S3DestinationConfig cloneWithFormatConfig(final S3FormatConfig formatConfig) {
140+
return new S3DestinationConfig(
141+
this.endpoint,
142+
this.bucketName,
143+
this.bucketPath,
144+
this.bucketRegion,
145+
this.accessKeyId,
146+
this.secretAccessKey,
147+
this.partSize,
148+
formatConfig
149+
);
150+
}
138151
}

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvFormatConfig.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,15 @@ public String getValue() {
4545
private final Long partSize;
4646

4747
public S3CsvFormatConfig(final JsonNode formatConfig) {
48-
this.flattening = Flattening.fromValue(formatConfig.get("flattening").asText());
49-
this.partSize = formatConfig.get(PART_SIZE_MB_ARG_NAME) != null ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() : null;
48+
this(
49+
Flattening.fromValue(formatConfig.get("flattening").asText()),
50+
formatConfig.get(PART_SIZE_MB_ARG_NAME) != null ? formatConfig.get(PART_SIZE_MB_ARG_NAME).asLong() : null
51+
);
52+
}
53+
54+
public S3CsvFormatConfig(final Flattening flattening, final Long partSize) {
55+
this.flattening = flattening;
56+
this.partSize = partSize;
5057
}
5158

5259
@Override

airbyte-integrations/connectors/source-jdbc/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ dependencies {
4848
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-api:5.4.2'
4949
testFixturesImplementation 'org.junit.jupiter:junit-jupiter-params:5.4.2'
5050
testFixturesImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.4.6'
51+
testFixturesImplementation group: 'org.mockito', name: 'mockito-inline', version: '4.1.0'
5152

5253
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
53-
}
54+
}

0 commit comments

Comments
 (0)