Skip to content

Commit 7c159b8

Browse files
chore: auto-fix lint and format issues
1 parent c62069f commit 7c159b8

File tree

10 files changed

+8
-28
lines changed

10 files changed

+8
-28
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.cdk.load.command.DestinationConfiguration
99
import io.airbyte.cdk.load.command.DestinationStream
1010
import io.airbyte.cdk.load.message.BatchEnvelope
1111
import io.airbyte.cdk.load.message.ChannelMessageQueue
12-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1312
import io.airbyte.cdk.load.message.DestinationRecordRaw
1413
import io.airbyte.cdk.load.message.MultiProducerChannel
1514
import io.airbyte.cdk.load.message.PartitionedQueue

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+1-5
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,7 @@ data class DestinationRecord(
145145
)
146146
}
147147
fun asDestinationRecordRaw(): DestinationRecordRaw {
148-
return DestinationRecordRaw(
149-
stream,
150-
message,
151-
serialized
152-
)
148+
return DestinationRecordRaw(stream, message, serialized)
153149
}
154150
}
155151

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.cdk.load.pipeline
66

7-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
87
import io.airbyte.cdk.load.message.DestinationRecordRaw
98
import io.airbyte.cdk.load.message.PartitionedQueue
109
import io.airbyte.cdk.load.message.PipelineEvent
@@ -25,8 +24,7 @@ import jakarta.inject.Singleton
2524
class DirectLoadPipelineStep<S : DirectLoader>(
2625
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
2726
@Named("recordQueue")
28-
val inputQueue:
29-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
27+
val inputQueue: PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
3028
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
3129
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
3230
val batchSizeOverride: Long? = null,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt

+1-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.Batch
8-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
98
import io.airbyte.cdk.load.message.DestinationRecordRaw
109
import io.airbyte.cdk.load.message.WithBatchState
1110
import io.airbyte.cdk.load.message.WithStream
@@ -31,10 +30,7 @@ class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
3130
return directLoaderFactory.create(key.stream, part)
3231
}
3332

34-
override fun accept(
35-
record: DestinationRecordRaw,
36-
state: S
37-
): Pair<S, DirectLoadAccResult?> {
33+
override fun accept(record: DestinationRecordRaw, state: S): Pair<S, DirectLoadAccResult?> {
3834
state.accept(record).let {
3935
return when (it) {
4036
is Incomplete -> Pair(state, null)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.cdk.load.pipeline
66

7-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
87
import io.airbyte.cdk.load.message.DestinationRecordRaw
98
import io.micronaut.context.annotation.Secondary
109
import jakarta.inject.Singleton

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.message.BatchEnvelope
1212
import io.airbyte.cdk.load.message.ChannelMessageQueue
1313
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
14-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1514
import io.airbyte.cdk.load.message.DestinationRecordRaw
1615
import io.airbyte.cdk.load.message.DestinationStreamEvent
1716
import io.airbyte.cdk.load.message.MessageQueue

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt

-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import io.airbyte.cdk.load.message.DestinationFile
1515
import io.airbyte.cdk.load.message.DestinationFileStreamComplete
1616
import io.airbyte.cdk.load.message.DestinationFileStreamIncomplete
1717
import io.airbyte.cdk.load.message.DestinationRecord
18-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1918
import io.airbyte.cdk.load.message.DestinationRecordRaw
2019
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
2120
import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package io.airbyte.cdk.load.write
66

77
import io.airbyte.cdk.load.command.DestinationStream
8-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
98
import io.airbyte.cdk.load.message.DestinationRecordRaw
109

1110
/**

airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import io.airbyte.cdk.load.command.DestinationCatalog
88
import io.airbyte.cdk.load.command.DestinationStream
99
import io.airbyte.cdk.load.data.MapperPipeline
1010
import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory
11-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1211
import io.airbyte.cdk.load.message.DestinationRecordRaw
1312
import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory
1413
import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil
@@ -92,7 +91,8 @@ class S3DataLakeDirectLoader(
9291
)
9392
writer.write(icebergRecord)
9493

95-
dataSize += recordAirbyteValue.serializedSizeBytes // TODO: use icebergRecord.size() instead?
94+
dataSize +=
95+
recordAirbyteValue.serializedSizeBytes // TODO: use icebergRecord.size() instead?
9696
if (dataSize < batchSize) {
9797
return DirectLoader.Incomplete
9898
}

airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakePartitioner.kt

+3-8
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,8 @@
44

55
package io.airbyte.integrations.destination.s3_data_lake
66

7-
import com.fasterxml.jackson.databind.JsonNode
87
import io.airbyte.cdk.load.command.Dedupe
98
import io.airbyte.cdk.load.command.DestinationCatalog
10-
import io.airbyte.cdk.load.data.ObjectValue
11-
import io.airbyte.cdk.load.data.json.toAirbyteValue
12-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
139
import io.airbyte.cdk.load.message.DestinationRecordRaw
1410
import io.airbyte.cdk.load.pipeline.InputPartitioner
1511
import jakarta.inject.Singleton
@@ -36,11 +32,10 @@ class S3DataLakePartitioner(catalog: DestinationCatalog) : InputPartitioner {
3632
streamToPrimaryKeyFieldNames[record.stream]?.let { primaryKey ->
3733
val jsonData = record.asRawJson()
3834

39-
val primaryKeyValues = primaryKey.map { keys ->
40-
keys.map { key ->
41-
if (jsonData.has(key)) jsonData.get(key) else null
35+
val primaryKeyValues =
36+
primaryKey.map { keys ->
37+
keys.map { key -> if (jsonData.has(key)) jsonData.get(key) else null }
4238
}
43-
}
4439
val hash = primaryKeyValues.hashCode()
4540
/** abs(MIN_VALUE) == MIN_VALUE, so we need to handle this case separately */
4641
if (hash == Int.MIN_VALUE) {

0 commit comments

Comments
 (0)