Skip to content

Commit 408dd0c

Browse files
committed
partitioning without using airbyteValues
1 parent 78fb096 commit 408dd0c

File tree

2 files changed

+12
-7
lines changed
  • airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message
  • airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake

2 files changed

+12
-7
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ data class EnrichedDestinationRecordAirbyteValue(
183183

184184
data class DestinationRecordRaw(
185185
val stream: DestinationStream.Descriptor,
186-
private val rawData: AirbyteMessage,
187-
private val serialized: String
186+
val rawData: AirbyteMessage,
187+
val serialized: String
188188
) {
189189
fun asDestinationRecordAirbyteValue(): DestinationRecordAirbyteValue {
190190
return DestinationRecordAirbyteValue(

airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePartitioner.kt

+10-5
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
package io.airbyte.integrations.destination.s3_data_lake
66

7+
import com.fasterxml.jackson.databind.JsonNode
78
import io.airbyte.cdk.load.command.Dedupe
89
import io.airbyte.cdk.load.command.DestinationCatalog
910
import io.airbyte.cdk.load.data.ObjectValue
11+
import io.airbyte.cdk.load.data.json.toAirbyteValue
1012
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1113
import io.airbyte.cdk.load.message.DestinationRecordRaw
1214
import io.airbyte.cdk.load.pipeline.InputPartitioner
@@ -27,15 +29,18 @@ class S3DataLakePartitioner(catalog: DestinationCatalog) : InputPartitioner {
2729
private val random = Random(System.currentTimeMillis())
2830

2931
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
30-
val recordAirbyteValue = record.asDestinationRecordAirbyteValue()
31-
3232
if (numParts == 1) {
3333
return 0
3434
}
3535

36-
streamToPrimaryKeyFieldNames[recordAirbyteValue.stream]?.let { primaryKey ->
37-
val primaryKeyValues =
38-
primaryKey.map { it.map { key -> (recordAirbyteValue.data as ObjectValue).values[key] } }
36+
streamToPrimaryKeyFieldNames[record.stream]?.let { primaryKey ->
37+
val jsonData = record.rawData.record.data as JsonNode
38+
39+
val primaryKeyValues = primaryKey.map { keys ->
40+
keys.map { key ->
41+
if (jsonData.has(key)) jsonData.get(key) else null
42+
}
43+
}
3944
val hash = primaryKeyValues.hashCode()
4045
/** abs(MIN_VALUE) == MIN_VALUE, so we need to handle this case separately */
4146
if (hash == Int.MIN_VALUE) {

0 commit comments

Comments
 (0)