Skip to content

Commit a50847a

Browse files
Fail without enqueueing iff the airbyte message type is unrecognized (#40254)
1 parent cd2da8e commit a50847a

File tree

6 files changed

+105
-15
lines changed

6 files changed

+105
-15
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ corresponds to that version.
173173
### Java CDK
174174

175175
| Version | Date | Pull Request | Subject |
176-
|:--------| :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
176+
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
177178
| 0.40.3 | 2024-06-18 | [\#39526](https://github.com/airbytehq/airbyte/pull/39526) | Destinations: INCOMPLETE stream status is a TRANSIENT error rather than SYSTEM |
178179
| 0.40.2 | 2024-06-18 | [\#39552](https://github.com/airbytehq/airbyte/pull/39552) | Destinations: Throw error if the ConfiguredCatalog has no streams |
179180
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+9-3
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,15 @@ constructor(
103103
* do it without touching buffer manager.
104104
*/
105105
val partialAirbyteMessage =
106-
airbyteMessageDeserializer.deserializeAirbyteMessage(
107-
message,
108-
)
106+
try {
107+
airbyteMessageDeserializer.deserializeAirbyteMessage(
108+
message,
109+
)
110+
} catch (e: AirbyteMessageDeserializer.UnrecognizedAirbyteMessageTypeException) {
111+
logger.warn { "Ignoring unrecognized message type: ${e.message}" }
112+
return
113+
}
114+
109115
when (partialAirbyteMessage.type) {
110116
AirbyteMessage.Type.RECORD -> {
111117
validateRecord(partialAirbyteMessage)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt

+38-5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package io.airbyte.cdk.integrations.destination.async.deser
55

6+
import com.fasterxml.jackson.databind.exc.ValueInstantiationException
67
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
78
import io.airbyte.commons.json.Jsons
89
import io.airbyte.protocol.models.v0.AirbyteMessage
@@ -13,16 +14,27 @@ private val logger = KotlinLogging.logger {}
1314
class AirbyteMessageDeserializer(
1415
private val dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
1516
) {
17+
class UnrecognizedAirbyteMessageTypeException(private val unrecognizedType: String) :
18+
Exception(unrecognizedType) {
19+
override fun toString(): String {
20+
return "Could not deserialize AirbyteMessage: unrecognized type: $unrecognizedType"
21+
}
22+
}
23+
1624
/**
17-
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record or a State
18-
* Message
25+
* Deserializes to a [PartialAirbyteMessage] which can represent both a Record, State, or Trace
26+
* Message.
27+
*
28+
* Throws on deserialization errors, obfuscating the error message to avoid data leakage. In
29+
* recoverable cases (currently only when the top-level message type is unrecognized), throws a
30+
* dedicated exception.
1931
*
2032
* PartialAirbyteMessage holds either:
2133
* * entire serialized message string when message is a valid State Message
2234
* * serialized AirbyteRecordMessage when message is a valid Record Message
2335
*
2436
* @param message the string to deserialize
25-
* @return PartialAirbyteMessage if the message is valid, empty otherwise
37+
* @return PartialAirbyteMessage if the message is valid
2638
*/
2739
fun deserializeAirbyteMessage(
2840
message: String?,
@@ -32,8 +44,29 @@ class AirbyteMessageDeserializer(
3244
// Use JsonSubTypes and extend StdDeserializer to properly handle this.
3345
// Make immutability a first class citizen in the PartialAirbyteMessage class.
3446
val partial =
35-
Jsons.tryDeserializeExact(message, PartialAirbyteMessage::class.java).orElseThrow {
36-
RuntimeException("Unable to deserialize PartialAirbyteMessage.")
47+
try {
48+
Jsons.deserializeExactUnchecked(message, PartialAirbyteMessage::class.java)
49+
} catch (e: ValueInstantiationException) {
50+
// This is a hack to catch unrecognized message types. Jackson supports
51+
// the equivalent via annotations, but we cannot use them because the
52+
// AirbyteMessage
53+
// is generated from json-schema.
54+
val pat =
55+
Regex("Cannot construct instance of .*AirbyteMessage.Type., problem: ([_A-Z]+)")
56+
val match = pat.find(e.message!!)
57+
if (match != null) {
58+
val unrecognized = match.groups[1]?.value
59+
logger.warn { "Unrecognized message type: $unrecognized" }
60+
throw UnrecognizedAirbyteMessageTypeException(unrecognized!!)
61+
} else {
62+
val obfuscated = Jsons.obfuscateDeserializationException(e)
63+
throw RuntimeException(
64+
"ValueInstantiationException when deserializing PartialAirbyteMessage: $obfuscated"
65+
)
66+
}
67+
} catch (e: Exception) {
68+
val obfuscated = Jsons.obfuscateDeserializationException(e)
69+
throw RuntimeException("Could not deserialize PartialAirbyteMessage: $obfuscated")
3770
}
3871

3972
val msgType = partial.type
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.40.3
1+
version=0.40.4

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+35
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import java.util.function.Consumer
4242
import java.util.stream.Stream
4343
import org.apache.commons.lang3.RandomStringUtils
4444
import org.junit.jupiter.api.Assertions.assertEquals
45+
import org.junit.jupiter.api.Assertions.assertFalse
4546
import org.junit.jupiter.api.Assertions.assertThrows
4647
import org.junit.jupiter.api.Assertions.assertTrue
4748
import org.junit.jupiter.api.BeforeEach
@@ -732,4 +733,38 @@ class AsyncStreamConsumerTest {
732733
}
733734
assertEquals(expRecords, actualRecords)
734735
}
736+
737+
@Test
738+
internal fun deserializeAirbyteMessageWithUnrecognizedType() {
739+
val airbyteMessage = AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
740+
val serialized = Jsons.serialize(airbyteMessage)
741+
// Fake an upstream protocol change
742+
val retyped =
743+
serialized.replace(AirbyteMessage.Type.RECORD.toString(), "__UNKNOWN_TYPE_OF_MESSAGE__")
744+
// Assert that this doesn't throw an exception
745+
consumer.start()
746+
assertDoesNotThrow { consumer.accept(retyped, retyped.length) }
747+
}
748+
749+
@Test
750+
internal fun deserializeAirbyteMessageWithUnrecognizedNonTypeEnum() {
751+
// NOTE: We are only guaranteeing failure on the top-level message type. Anything else
752+
// should break deserialization and result in an *obfuscated* error message.
753+
val airbyteMessage =
754+
AirbyteMessage()
755+
.withType(AirbyteMessage.Type.RECORD)
756+
.withState(
757+
AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.STREAM)
758+
)
759+
val serialized = Jsons.serialize(airbyteMessage)
760+
// Fake an upstream protocol change
761+
val offender = "__UNKNOWN_NONTYPE_ENUM__"
762+
val retyped = serialized.replace("STREAM", offender)
763+
// Assert that this doesn't throw an exception
764+
consumer.start()
765+
val throwable =
766+
assertThrows(RuntimeException::class.java) { consumer.accept(retyped, retyped.length) }
767+
// Ensure that the offending data has been scrubbed from the error message
768+
assertFalse(throwable.message!!.contains(offender))
769+
}
735770
}

airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt

+20-5
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ object Jsons {
139139
}
140140
}
141141

142+
// WARNING: This message throws bare exceptions on parse failure which might
143+
// leak sensitive data. Use obfuscateDeserializationException() to strip
144+
// the sensitive data before logging.
145+
@JvmStatic
146+
fun <T : Any> deserializeExactUnchecked(jsonString: String?, klass: Class<T>?): T {
147+
return OBJECT_MAPPER_EXACT.readValue(jsonString, klass)
148+
}
149+
142150
@JvmStatic
143151
fun <T : Any> tryDeserialize(jsonString: String, klass: Class<T>): Optional<T> {
144152
return try {
@@ -425,9 +433,17 @@ object Jsons {
425433
* potentially-sensitive information. </snip...>
426434
*/
427435
private fun <T : Any> handleDeserThrowable(throwable: Throwable): Optional<T> {
428-
// Manually build the stacktrace, excluding the top-level exception object
429-
// so that we don't accidentally include the exception message.
430-
// Otherwise we could just do ExceptionUtils.getStackTrace(t).
436+
val obfuscated = obfuscateDeserializationException(throwable)
437+
LOGGER.warn { "Failed to deserialize json due to $obfuscated" }
438+
return Optional.empty()
439+
}
440+
441+
/**
442+
* Build a stacktrace from the given throwable, enabling us to log or rethrow without leaking
443+
* sensitive information in the exception message (which would be exposed with eg,
444+
* ExceptionUtils.getStackTrace(t).)
445+
*/
446+
fun obfuscateDeserializationException(throwable: Throwable): String {
431447
var t: Throwable = throwable
432448
val sb = StringBuilder()
433449
sb.append(t.javaClass)
@@ -444,8 +460,7 @@ object Jsons {
444460
sb.append(traceElement.toString())
445461
}
446462
}
447-
LOGGER.warn { "Failed to deserialize json due to $sb" }
448-
return Optional.empty()
463+
return sb.toString()
449464
}
450465

451466
/**

0 commit comments

Comments
 (0)