Skip to content

Commit ae36c6d

Browse files
DO NOT MERGE: Spammy s3 perf test override
1 parent 61d5a62 commit ae36c6d

File tree

6 files changed

+102
-11
lines changed

6 files changed

+102
-11
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory
4444
import io.airbyte.cdk.load.task.internal.UpdateBatchStateTaskFactory
4545
import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask
4646
import io.airbyte.cdk.load.util.setOnce
47+
import io.airbyte.cdk.load.write.WriteOpOverride
4748
import io.github.oshai.kotlinlogging.KotlinLogging
4849
import io.micronaut.context.annotation.Secondary
4950
import io.micronaut.context.annotation.Value
@@ -153,6 +154,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
153154
private val loadPipeline: LoadPipeline?,
154155
private val partitioner: InputPartitioner,
155156
private val updateBatchTaskFactory: UpdateBatchStateTaskFactory,
157+
private val writeOpOverride: WriteOpOverride? = null
156158
) : DestinationTaskLauncher {
157159
private val log = KotlinLogging.logger {}
158160

@@ -200,6 +202,10 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
200202
}
201203

202204
override suspend fun run() {
205+
if (writeOpOverride != null) {
206+
return
207+
}
208+
203209
// Start the input consumer ASAP
204210
log.info { "Starting input consumer task" }
205211
val inputConsumerTask =

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/WriteOperation.kt

+12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.airbyte.cdk.Operation
88
import io.airbyte.cdk.load.state.DestinationFailure
99
import io.airbyte.cdk.load.state.DestinationSuccess
1010
import io.airbyte.cdk.load.state.SyncManager
11+
import io.airbyte.cdk.load.task.Task
1112
import io.airbyte.cdk.load.task.TaskLauncher
1213
import io.github.oshai.kotlinlogging.KotlinLogging
1314
import io.micronaut.context.annotation.Factory
@@ -17,6 +18,8 @@ import java.io.InputStream
1718
import javax.inject.Singleton
1819
import kotlinx.coroutines.runBlocking
1920

21+
interface WriteOpOverride: Task
22+
2023
/**
2124
* Write operation. Executed by the core framework when the operation is "write". Launches the core
2225
* services and awaits completion.
@@ -26,12 +29,21 @@ import kotlinx.coroutines.runBlocking
2629
class WriteOperation(
2730
private val taskLauncher: TaskLauncher,
2831
private val syncManager: SyncManager,
32+
private val writeOpOverride: WriteOpOverride? = null
2933
) : Operation {
3034
val log = KotlinLogging.logger {}
3135

3236
override fun execute() = runBlocking {
3337
taskLauncher.run()
3438

39+
if (writeOpOverride != null) {
40+
val now = System.currentTimeMillis()
41+
log.info { "Running override task" }
42+
writeOpOverride.execute()
43+
log.info { "Write operation override took ${System.currentTimeMillis() - now} ms" }
44+
return@runBlocking
45+
}
46+
3547
when (val result = syncManager.awaitDestinationResult()) {
3648
is DestinationSuccess -> {
3749
if (!syncManager.allStreamsComplete()) {

airbyte-integrations/connectors/destination-s3/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: file
33
connectorType: destination
44
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
5-
dockerImageTag: 1.5.5
5+
dockerImageTag: 1.5.6
66
dockerRepository: airbyte/destination-s3
77
githubIssueLabel: destination-s3
88
icon: s3.svg

airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ data class S3V2Configuration<T : OutputStream>(
4444

4545
/** Below has no effect until [S3V2ObjectLoader] is enabled. */
4646
val numPartWorkers: Int = 2,
47-
val numUploadWorkers: Int = 10,
47+
val numUploadWorkers: Int = 5,
4848
val maxMemoryRatioReservedForParts: Double = 0.2,
4949
val objectSizeBytes: Long = 200L * 1024 * 1024,
5050
val partSizeBytes: Long = 10L * 1024 * 1024,
@@ -73,11 +73,10 @@ class S3V2ConfigurationFactory :
7373
objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(),
7474
objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(),
7575
objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(),
76-
numPartWorkers = pojo.numPartWorkers ?: 2,
77-
numUploadWorkers = pojo.numObjectLoaders ?: 3,
78-
maxMemoryRatioReservedForParts = pojo.maxMemoryRatioReservedForParts ?: 0.2,
79-
partSizeBytes = (pojo.partSizeMb ?: 10) * 1024L * 1024L,
76+
numUploadWorkers = pojo.numObjectLoaders ?: 10,
77+
partSizeBytes = (pojo.partSizeMb ?: 20) * 1024L * 1024L,
8078
useLegacyClient = pojo.useLegacyClient ?: false,
79+
objectSizeBytes = (pojo.totalDataMb ?: 1024) * 1024L * 1024L,
8180
)
8281
}
8382
}

airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Specification.kt

+3-5
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,12 @@ class S3V2Specification :
7676
)
7777
override val fileNamePattern: String? = null
7878

79-
@get:JsonProperty("num_part_workers")
80-
val numPartWorkers: Int? = null
81-
@get:JsonProperty("num_upload_workers")
79+
@get:JsonProperty("num_uploaders")
8280
val numObjectLoaders: Int? = null
8381
@get:JsonProperty("part_size_mb")
8482
val partSizeMb: Int? = null
85-
@get:JsonProperty("max_memory_ratio_reserved_for_parts")
86-
val maxMemoryRatioReservedForParts: Double? = null
83+
@get:JsonProperty("total_data_mb")
84+
val totalDataMb: Int? = null
8785
@get:JsonProperty("use_legacy_client")
8886
val useLegacyClient: Boolean? = null
8987
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package io.airbyte.integrations.destination.s3_v2
2+
3+
import io.airbyte.cdk.load.command.DestinationCatalog
4+
import io.airbyte.cdk.load.file.object_storage.PathFactory
5+
import io.airbyte.cdk.load.file.s3.S3Client
6+
import io.airbyte.cdk.load.task.SelfTerminating
7+
import io.airbyte.cdk.load.task.Task
8+
import io.airbyte.cdk.load.task.TerminalCondition
9+
import io.airbyte.cdk.load.write.WriteOpOverride
10+
import io.github.oshai.kotlinlogging.KotlinLogging
11+
import jakarta.inject.Singleton
12+
import kotlin.random.Random
13+
import kotlin.time.measureTime
14+
import kotlinx.coroutines.Dispatchers
15+
import kotlinx.coroutines.coroutineScope
16+
import kotlinx.coroutines.launch
17+
import kotlinx.coroutines.withContext
18+
19+
@Singleton
20+
class S3V2WriteOpOverride(
21+
private val client: S3Client,
22+
private val catalog: DestinationCatalog,
23+
private val config: S3V2Configuration<*>,
24+
private val pathFactory: PathFactory,
25+
): WriteOpOverride {
26+
private val log = KotlinLogging.logger { }
27+
28+
override val terminalCondition: TerminalCondition = SelfTerminating
29+
30+
override suspend fun execute() = coroutineScope {
31+
val prng = Random(System.currentTimeMillis())
32+
val randomPart = prng.nextBytes(config.partSizeBytes.toInt())
33+
val randomString = randomPart.take(32).joinToString("") { "%02x".format(it) }
34+
val stream = catalog.streams.first()
35+
val objectKey = pathFactory.getFinalDirectory(stream) + "/mock-perf-test-$randomString"
36+
37+
val numParts = (config.objectSizeBytes / config.partSizeBytes).toInt()
38+
val partsPerWorker = numParts / config.numUploadWorkers
39+
val actualSizeBytes = partsPerWorker * config.numUploadWorkers * config.partSizeBytes
40+
41+
log.info {
42+
"root key=$objectKey; part_size=${config.partSizeBytes}b; num_parts=$numParts (per_worker=$partsPerWorker); total_size=${actualSizeBytes}b; num_workers=${config.numUploadWorkers}"
43+
}
44+
45+
val duration = measureTime {
46+
withContext(Dispatchers.IO) {
47+
runTest(objectKey, partsPerWorker, randomPart)
48+
}
49+
}
50+
val mbs = actualSizeBytes.toFloat() / duration.inWholeSeconds.toFloat() / 1024 / 1024
51+
log.info {
52+
// format mbs to 2 decimal places
53+
"Uploaded $actualSizeBytes bytes in $duration seconds (${"%.2f".format(mbs)} MB/s)"
54+
}
55+
}
56+
57+
private suspend fun runTest(
58+
objectKey: String,
59+
partsPerWorker: Int,
60+
randomPart: ByteArray,
61+
) = coroutineScope {
62+
repeat (config.numUploadWorkers) {
63+
launch {
64+
val workerKey = "$objectKey-worker-$it"
65+
log.info { "Starting upload to $workerKey" }
66+
val upload = client.startStreamingUpload(workerKey)
67+
repeat(partsPerWorker) {
68+
log.info { "Uploading part ${it + 1} of $workerKey" }
69+
upload.uploadPart(randomPart, it + 1)
70+
}
71+
log.info { "Completing upload to $workerKey" }
72+
upload.complete()
73+
}
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)