Skip to content

Commit 09e3ac5

Browse files
Fix PQ empty part issue
1 parent a40a40a commit 09e3ac5

File tree

4 files changed

+34
-13
lines changed

4 files changed

+34
-13
lines changed

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ class ObjectLoaderPartToObjectAccumulator(
6767

6868
override suspend fun accept(input: Part, state: State): Pair<State, ObjectResult?> {
6969
input.bytes?.let { state.streamingUpload.uploadPart(it, input.partIndex) }
70-
?: throw IllegalStateException("Empty part received: this should not happen")
70+
if (input.bytes == null) {
71+
throw IllegalStateException("Empty non-final part received: this should not happen")
72+
}
7173
state.bookkeeper.add(input)
7274
if (state.bookkeeper.isComplete) {
7375
return Pair(state, finish(state))

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ class ObjectLoaderRecordToPartAccumulator<T : OutputStream>(
6868
return State(stream, writerFactory.create(stream), partFactory)
6969
}
7070

71-
private fun makePart(state: State<T>): Part {
71+
private fun makePart(state: State<T>, forceFinish: Boolean = false): Part {
7272
state.writer.flush()
7373
val newSize = state.partFactory.totalSize + state.writer.bufferSize
74-
val isFinal =
74+
val isFinal = forceFinish ||
7575
newSize >= objectSizeBytes ||
7676
batchSizeOverride != null // HACK: This is a hack to force a flush
7777
val bytes =
@@ -109,6 +109,6 @@ class ObjectLoaderRecordToPartAccumulator<T : OutputStream>(
109109
}
110110

111111
override suspend fun finish(state: State<T>): Part {
112-
return makePart(state)
112+
return makePart(state, true)
113113
}
114114
}

airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt

+21-9
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,35 @@
44

55
package io.airbyte.integrations.destination.s3_v2
66

7-
import io.airbyte.cdk.load.command.Property
87
import io.airbyte.cdk.load.write.BasicPerformanceTest
9-
import org.junit.jupiter.api.Disabled
108
import org.junit.jupiter.api.Test
119

1210
//@Disabled("We don't want this to run in CI")
13-
class S3V2PerformanceTest :
11+
class S3V2JsonNoFrillsPerformanceTest :
1412
BasicPerformanceTest(
1513
configContents = S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH),
1614
configSpecClass = S3V2Specification::class.java,
1715
defaultRecordsToInsert = 1_000_000,
18-
micronautProperties =
19-
mapOf(
20-
Property("airbyte.destination.aws.assume-role.access-key", "FOO") to "foo",
21-
Property("airbyte.destination.aws.assume-role.secret-key", "BAR") to "bar",
22-
Property("airbyte.destination.aws.assume-role.external-id", "BAZ") to "baz",
23-
),
16+
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES
17+
) {
18+
@Test
19+
override fun testInsertRecords() {
20+
super.testInsertRecords()
21+
}
22+
23+
@Test
24+
override fun testRefreshingRecords() {
25+
super.testRefreshingRecords()
26+
}
27+
}
28+
29+
class S3V2ParquetSnappyPerformanceTest :
30+
BasicPerformanceTest(
31+
configContents = S3V2TestUtils.getConfig(S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH),
32+
configSpecClass = S3V2Specification::class.java,
33+
defaultRecordsToInsert = 1_000_000,
34+
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES
35+
,
2436
) {
2537
@Test
2638
override fun testInsertRecords() {

airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.destination.s3_v2
66

7+
import io.airbyte.cdk.load.command.Property
78
import io.airbyte.cdk.load.command.aws.AwsAssumeRoleCredentials
89
import io.airbyte.cdk.load.util.Jsons
910
import java.nio.file.Files
@@ -52,4 +53,10 @@ object S3V2TestUtils {
5253
assumeRoleExternalId,
5354
)
5455
}
56+
57+
val PERFORMANCE_TEST_MICRONAUT_PROPERTIES = mapOf(
58+
Property("airbyte.destination.aws.assume-role.access-key", "FOO") to "foo",
59+
Property("airbyte.destination.aws.assume-role.secret-key", "BAR") to "bar",
60+
Property("airbyte.destination.aws.assume-role.external-id", "BAZ") to "baz",
61+
)
5562
}

0 commit comments

Comments
 (0)