Skip to content

Commit aab795b

Browse files
DO NOT MERGE: Spammy s3 perf test override
1 parent 61d5a62 commit aab795b

File tree

7 files changed

+108
-13
lines changed

7 files changed

+108
-13
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+9
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
4444
import io.airbyte.cdk.load.task.internal.UpdateBatchStateTaskFactory
4545
import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask
4646
import io.airbyte.cdk.load.util.setOnce
47+
import io.airbyte.cdk.load.write.WriteOpOverride
4748
import io.github.oshai.kotlinlogging.KotlinLogging
4849
import io.micronaut.context.annotation.Secondary
4950
import io.micronaut.context.annotation.Value
@@ -153,6 +154,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
153154
private val loadPipeline: LoadPipeline?,
154155
private val partitioner: InputPartitioner,
155156
private val updateBatchTaskFactory: UpdateBatchStateTaskFactory,
157+
private val writeOpOverride: WriteOpOverride? = null
156158
) : DestinationTaskLauncher {
157159
private val log = KotlinLogging.logger {}
158160

@@ -200,6 +202,13 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
200202
}
201203

202204
override suspend fun run() {
205+
if (writeOpOverride != null) {
206+
log.info { "Write operation override found, running override task." }
207+
return
208+
} else {
209+
log.info { "No write operation override found, continuing with normal operation." }
210+
}
211+
203212
// Start the input consumer ASAP
204213
log.info { "Starting input consumer task" }
205214
val inputConsumerTask =

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt

+12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.airbyte.cdk.Operation
88
import io.airbyte.cdk.load.state.DestinationFailure
99
import io.airbyte.cdk.load.state.DestinationSuccess
1010
import io.airbyte.cdk.load.state.SyncManager
11+
import io.airbyte.cdk.load.task.Task
1112
import io.airbyte.cdk.load.task.TaskLauncher
1213
import io.github.oshai.kotlinlogging.KotlinLogging
1314
import io.micronaut.context.annotation.Factory
@@ -17,6 +18,8 @@ import java.io.InputStream
1718
import javax.inject.Singleton
1819
import kotlinx.coroutines.runBlocking
1920

21+
interface WriteOpOverride: Task
22+
2023
/**
2124
* Write operation. Executed by the core framework when the operation is "write". Launches the core
2225
* services and awaits completion.
@@ -26,12 +29,21 @@ import kotlinx.coroutines.runBlocking
2629
class WriteOperation(
2730
private val taskLauncher: TaskLauncher,
2831
private val syncManager: SyncManager,
32+
private val writeOpOverride: WriteOpOverride? = null
2933
) : Operation {
3034
val log = KotlinLogging.logger {}
3135

3236
override fun execute() = runBlocking {
3337
taskLauncher.run()
3438

39+
if (writeOpOverride != null) {
40+
val now = System.currentTimeMillis()
41+
log.info { "Running override task" }
42+
writeOpOverride.execute()
43+
log.info { "Write operation override took ${System.currentTimeMillis() - now} ms" }
44+
return@runBlocking
45+
}
46+
3547
when (val result = syncManager.awaitDestinationResult()) {
3648
is DestinationSuccess -> {
3749
if (!syncManager.allStreamsComplete()) {

airbyte-integrations/connectors/destination-s3/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: file
33
connectorType: destination
44
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
5-
dockerImageTag: 1.5.5
5+
dockerImageTag: 1.5.6
66
dockerRepository: airbyte/destination-s3
77
githubIssueLabel: destination-s3
88
icon: s3.svg

airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ data class S3V2Configuration<T : OutputStream>(
4444

4545
/** Below has no effect until [S3V2ObjectLoader] is enabled. */
4646
val numPartWorkers: Int = 2,
47-
val numUploadWorkers: Int = 10,
47+
val numUploadWorkers: Int = 5,
4848
val maxMemoryRatioReservedForParts: Double = 0.2,
4949
val objectSizeBytes: Long = 200L * 1024 * 1024,
5050
val partSizeBytes: Long = 10L * 1024 * 1024,
@@ -73,11 +73,10 @@ class S3V2ConfigurationFactory :
7373
objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(),
7474
objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(),
7575
objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(),
76-
numPartWorkers = pojo.numPartWorkers ?: 2,
77-
numUploadWorkers = pojo.numObjectLoaders ?: 3,
78-
maxMemoryRatioReservedForParts = pojo.maxMemoryRatioReservedForParts ?: 0.2,
79-
partSizeBytes = (pojo.partSizeMb ?: 10) * 1024L * 1024L,
80-
useLegacyClient = pojo.useLegacyClient ?: false,
76+
numUploadWorkers = pojo.numObjectLoaders ?: 25,
77+
partSizeBytes = (pojo.partSizeMb ?: 50) * 1024L * 1024L,
78+
useLegacyClient = pojo.useLegacyClient ?: true,
79+
objectSizeBytes = (pojo.totalDataMb ?: 2024) * 1024L * 1024L,
8180
)
8281
}
8382
}

airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Specification.kt

+3-5
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,12 @@ class S3V2Specification :
7676
)
7777
override val fileNamePattern: String? = null
7878

79-
@get:JsonProperty("num_part_workers")
80-
val numPartWorkers: Int? = null
81-
@get:JsonProperty("num_upload_workers")
79+
@get:JsonProperty("num_uploaders")
8280
val numObjectLoaders: Int? = null
8381
@get:JsonProperty("part_size_mb")
8482
val partSizeMb: Int? = null
85-
@get:JsonProperty("max_memory_ratio_reserved_for_parts")
86-
val maxMemoryRatioReservedForParts: Double? = null
83+
@get:JsonProperty("total_data_mb")
84+
val totalDataMb: Int? = null
8785
@get:JsonProperty("use_legacy_client")
8886
val useLegacyClient: Boolean? = null
8987
}

airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,12 @@ class S3V2WriteTestJsonUncompressed :
314314
schematizedArrayBehavior = SchematizedNestedValueBehavior.PASS_THROUGH,
315315
preserveUndeclaredFields = true,
316316
allTypesBehavior = Untyped,
317-
)
317+
) {
318+
@Test
319+
override fun testBasicWrite() {
320+
super.testBasicWrite()
321+
}
322+
}
318323

319324
class S3V2WriteTestJsonRootLevelFlattening :
320325
S3V2WriteTest(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.airbyte.integrations.destination.s3_v2
2+
3+
import io.airbyte.cdk.load.command.DestinationCatalog
4+
import io.airbyte.cdk.load.file.object_storage.PathFactory
5+
import io.airbyte.cdk.load.file.s3.S3Client
6+
import io.airbyte.cdk.load.task.SelfTerminating
7+
import io.airbyte.cdk.load.task.Task
8+
import io.airbyte.cdk.load.task.TerminalCondition
9+
import io.airbyte.cdk.load.write.WriteOpOverride
10+
import io.github.oshai.kotlinlogging.KotlinLogging
11+
import jakarta.inject.Singleton
12+
import kotlin.random.Random
13+
import kotlin.time.measureTime
14+
import kotlinx.coroutines.Dispatchers
15+
import kotlinx.coroutines.ExperimentalCoroutinesApi
16+
import kotlinx.coroutines.async
17+
import kotlinx.coroutines.awaitAll
18+
import kotlinx.coroutines.coroutineScope
19+
import kotlinx.coroutines.launch
20+
import kotlinx.coroutines.withContext
21+
22+
@Singleton
23+
class S3V2WriteOpOverride(
24+
private val client: S3Client,
25+
private val catalog: DestinationCatalog,
26+
private val config: S3V2Configuration<*>,
27+
private val pathFactory: PathFactory,
28+
): WriteOpOverride {
29+
private val log = KotlinLogging.logger { }
30+
31+
override val terminalCondition: TerminalCondition = SelfTerminating
32+
33+
@OptIn(ExperimentalCoroutinesApi::class)
34+
override suspend fun execute() = coroutineScope {
35+
val prng = Random(System.currentTimeMillis())
36+
val randomPart = prng.nextBytes(config.partSizeBytes.toInt())
37+
val randomString = randomPart.take(32).joinToString("") { "%02x".format(it) }
38+
val stream = catalog.streams.first()
39+
val objectKey = pathFactory.getFinalDirectory(stream) + "/mock-perf-test-$randomString"
40+
41+
val numParts = (config.objectSizeBytes / config.partSizeBytes).toInt()
42+
val partsPerWorker = numParts / config.numUploadWorkers
43+
val actualSizeBytes = partsPerWorker * config.numUploadWorkers * config.partSizeBytes
44+
45+
log.info {
46+
"root key=$objectKey; part_size=${config.partSizeBytes}b; num_parts=$numParts (per_worker=$partsPerWorker); total_size=${actualSizeBytes}b; num_workers=${config.numUploadWorkers}"
47+
}
48+
49+
val duration = measureTime {
50+
withContext(Dispatchers.IO.limitedParallelism(config.numUploadWorkers)) {
51+
(0 until config.numUploadWorkers).map {
52+
async {
53+
val workerKey = "$objectKey-worker-$it"
54+
log.info { "Starting upload to $workerKey" }
55+
val upload = client.startStreamingUpload(workerKey)
56+
repeat(partsPerWorker) {
57+
log.info { "Uploading part ${it + 1} of $workerKey" }
58+
upload.uploadPart(randomPart, it + 1)
59+
}
60+
log.info { "Completing upload to $workerKey" }
61+
upload.complete()
62+
}
63+
}.awaitAll()
64+
}
65+
}
66+
val mbs = actualSizeBytes.toFloat() / duration.inWholeSeconds.toFloat() / 1024 / 1024
67+
log.info {
68+
// format mbs to 2 decimal places
69+
"Uploaded $actualSizeBytes bytes in $duration seconds (${"%.2f".format(mbs)} MB/s)"
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)