Skip to content

Commit a1ad68f

Browse files
committed
DevNullDirectLoader fix
1 parent 2c3a697 commit a1ad68f

File tree

1 file changed

+7
-7
lines changed
  • airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null

1 file changed

+7
-7
lines changed

airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullDirectLoader.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package io.airbyte.integrations.destination.dev_null
66

77
import io.airbyte.cdk.load.command.DestinationStream
8-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
99
import io.airbyte.cdk.load.write.DirectLoader
1010
import io.airbyte.cdk.load.write.DirectLoaderFactory
1111
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -18,9 +18,9 @@ abstract class DevNullDirectLoader(
1818
) : DirectLoader {
1919
private var recordCount: Long = 0L
2020

21-
abstract fun acceptInner(record: DestinationRecordAirbyteValue)
21+
abstract fun acceptInner(record: DestinationRecordRaw)
2222

23-
override fun accept(record: DestinationRecordAirbyteValue): DirectLoader.DirectLoadResult {
23+
override fun accept(record: DestinationRecordRaw): DirectLoader.DirectLoadResult {
2424
acceptInner(record)
2525
return if (++recordCount % config.ackRatePerRecord == 0L) {
2626
DirectLoader.Complete
@@ -87,7 +87,7 @@ class LoggingDirectLoader(
8787
private var logCount: Long = 0L
8888
private val prng: Random = loggingConfig.seed?.let { Random(it) } ?: Random.Default
8989

90-
override fun acceptInner(record: DestinationRecordAirbyteValue) {
90+
override fun acceptInner(record: DestinationRecordRaw) {
9191
if (recordCount++ % loggingConfig.logEvery == 0L) {
9292
if (loggingConfig.sampleRate == 1.0 || prng.nextDouble() < loggingConfig.sampleRate) {
9393
if (++logCount < loggingConfig.maxEntryCount) {
@@ -101,15 +101,15 @@ class LoggingDirectLoader(
101101
}
102102

103103
class SilentDirectLoader(config: DevNullConfiguration) : DevNullDirectLoader(config) {
104-
override fun acceptInner(record: DestinationRecordAirbyteValue) {
104+
override fun acceptInner(record: DestinationRecordRaw) {
105105
/* Do nothing */
106106
}
107107
}
108108

109109
class ThrottledDirectLoader(config: DevNullConfiguration, private val millisPerRecord: Long) :
110110
DevNullDirectLoader(config) {
111111

112-
override fun acceptInner(record: DestinationRecordAirbyteValue) {
112+
override fun acceptInner(record: DestinationRecordRaw) {
113113
Thread.sleep(millisPerRecord)
114114
}
115115
}
@@ -123,7 +123,7 @@ class FailingDirectLoader(
123123

124124
private var messageCount: Long = 0L
125125

126-
override fun acceptInner(record: DestinationRecordAirbyteValue) {
126+
override fun acceptInner(record: DestinationRecordRaw) {
127127
if (messageCount++ > numMessages) {
128128
val message =
129129
"Failing Destination(stream=$stream, numMessages=$numMessages: failing at $record)"

0 commit comments

Comments
 (0)