Skip to content

Commit 78fb096

Browse files
committed
Status quo
1 parent 35cd3c3 commit 78fb096

File tree

1 file changed

+6
-3
lines changed
  • airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake

1 file changed

+6
-3
lines changed

airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePartitioner.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.airbyte.cdk.load.command.Dedupe
88
import io.airbyte.cdk.load.command.DestinationCatalog
99
import io.airbyte.cdk.load.data.ObjectValue
1010
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
11+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1112
import io.airbyte.cdk.load.pipeline.InputPartitioner
1213
import jakarta.inject.Singleton
1314
import kotlin.math.abs
@@ -25,14 +26,16 @@ class S3DataLakePartitioner(catalog: DestinationCatalog) : InputPartitioner {
2526
}
2627
private val random = Random(System.currentTimeMillis())
2728

28-
override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int {
29+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
30+
val recordAirbyteValue = record.asDestinationRecordAirbyteValue()
31+
2932
if (numParts == 1) {
3033
return 0
3134
}
3235

33-
streamToPrimaryKeyFieldNames[record.stream]?.let { primaryKey ->
36+
streamToPrimaryKeyFieldNames[recordAirbyteValue.stream]?.let { primaryKey ->
3437
val primaryKeyValues =
35-
primaryKey.map { it.map { key -> (record.data as ObjectValue).values[key] } }
38+
primaryKey.map { it.map { key -> (recordAirbyteValue.data as ObjectValue).values[key] } }
3639
val hash = primaryKeyValues.hashCode()
3740
/** abs(MIN_VALUE) == MIN_VALUE, so we need to handle this case separately */
3841
if (hash == Int.MIN_VALUE) {

0 commit comments

Comments
 (0)