-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Destination-S3] Uses New Load CDK LoadStrategy Interface (temporarily disabled) #54695
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.load.pipeline | ||
|
||
import io.airbyte.cdk.load.message.DestinationRecordRaw | ||
import kotlin.random.Random | ||
|
||
/** | ||
* Declare a singleton of this type to have input distributed evenly across the input partitions. | ||
* (The default is to [ByStreamInputPartitioner].) | ||
* | ||
* [rotateEveryNRecords] determines how often to rotate to the next partition. In testing, 10_000 | ||
* seems to be the sweet spot between too much context switching and not enough load balancing. | ||
*/ | ||
open class RoundRobinInputPartitioner(private val rotateEveryNRecords: Int = 10_000) : | ||
InputPartitioner { | ||
private var nextPartition = | ||
Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE / rotateEveryNRecords) * | ||
rotateEveryNRecords | ||
|
||
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int { | ||
val part = nextPartition++ / rotateEveryNRecords | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this needs to be threadsafe? Something like: private val counter = AtomicInteger(
Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE / rotateEveryNRecords) * rotateEveryNRecords
)
// and then in getPartition:
val currentValue = counter.getAndIncrement()
val part = currentValue / rotateEveryNRecords There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One instance of a partitioner should be in use by any given coroutine, and getPartition isn't a suspend function, so it should be fine. There might be some ambiguity around the fact that it gets applies inside a fold over a flow, which is a suspend function. I could be extra safe and add it to the state that gets threaded through |
||
return Math.floorMod(part, numParts) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are missing something here?