Skip to content

Commit 4417769

Browse files
authored
[cdk, source-postgres, source-mysql] a new error handling and translation framework (#40208)
Fixes airbytehq/airbyte-internal-issues#8516 This set of changes mainly moves error translation to be part of each connector. In general, each connector needs to implement its own error translation class that inherits from the abstract class ConnectorExceptionTranslator, which is part of the CDK. By implementing, it means the connector developer or our support will populate the error dictionary with error samples with matching rules (e.g., regex). See the example we created for the Postgres source.
1 parent 824b79c commit 4417769

File tree

20 files changed

+781
-648
lines changed

20 files changed

+781
-648
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+268-268
Large diffs are not rendered by default.

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt

+12-57
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ import com.google.common.annotations.VisibleForTesting
88
import com.google.common.base.Preconditions
99
import com.google.common.collect.Lists
1010
import datadog.trace.api.Trace
11-
import io.airbyte.cdk.integrations.util.ApmTraceUtils
12-
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil
11+
import io.airbyte.cdk.integrations.util.ConnectorExceptionHandler
1312
import io.airbyte.cdk.integrations.util.concurrent.ConcurrentStreamConsumer
1413
import io.airbyte.commons.features.EnvVariableFeatureFlags
1514
import io.airbyte.commons.features.FeatureFlags
@@ -110,17 +109,24 @@ internal constructor(
110109

111110
@Trace(operationName = "RUN_OPERATION")
112111
@Throws(Exception::class)
113-
fun run(args: Array<String>) {
112+
@JvmOverloads
113+
fun run(
114+
args: Array<String>,
115+
exceptionHandler: ConnectorExceptionHandler = ConnectorExceptionHandler()
116+
) {
114117
val parsed = cliParser.parse(args)
115118
try {
116-
runInternal(parsed)
119+
runInternal(parsed, exceptionHandler)
117120
} catch (e: Exception) {
118121
throw e
119122
}
120123
}
121124

122125
@Throws(Exception::class)
123-
private fun runInternal(parsed: IntegrationConfig) {
126+
private fun runInternal(
127+
parsed: IntegrationConfig,
128+
exceptionHandler: ConnectorExceptionHandler
129+
) {
124130
LOGGER.info { "Running integration: ${integration.javaClass.name}" }
125131
LOGGER.info { "Command: ${parsed.command}" }
126132
LOGGER.info { "Integration config: $parsed" }
@@ -213,59 +219,8 @@ internal constructor(
213219
}
214220
}
215221
} catch (e: Exception) {
216-
LOGGER.error(e) { "caught exception!" }
217-
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An
218-
// attempt is made
219-
// to
220-
// find the root exception that corresponds to a configuration error. If that does not
221-
// exist, we
222-
// just return the original exception.
223-
ApmTraceUtils.addExceptionToTrace(e)
224-
val rootConfigErrorThrowable = ConnectorExceptionUtil.getRootConfigError(e)
225-
val rootTransientErrorThrowable = ConnectorExceptionUtil.getRootTransientError(e)
226-
// If the source connector throws a config error, a trace message with the relevant
227-
// message should
228-
// be surfaced.
229-
if (parsed.command == Command.CHECK) {
230-
// Currently, special handling is required for the CHECK case since the user display
231-
// information in
232-
// the trace message is
233-
// not properly surfaced to the FE. In the future, we can remove this and just throw
234-
// an exception.
235-
outputRecordCollector.accept(
236-
AirbyteMessage()
237-
.withType(AirbyteMessage.Type.CONNECTION_STATUS)
238-
.withConnectionStatus(
239-
AirbyteConnectionStatus()
240-
.withStatus(AirbyteConnectionStatus.Status.FAILED)
241-
.withMessage(
242-
ConnectorExceptionUtil.getDisplayMessage(
243-
rootConfigErrorThrowable
244-
)
245-
)
246-
)
247-
)
248-
return
249-
}
250-
251-
if (ConnectorExceptionUtil.isConfigError(rootConfigErrorThrowable)) {
252-
AirbyteTraceMessageUtility.emitConfigErrorTrace(
253-
e,
254-
ConnectorExceptionUtil.getDisplayMessage(rootConfigErrorThrowable),
255-
)
256-
// On receiving a config error, the container should be immediately shut down.
257-
System.exit(1)
258-
} else if (ConnectorExceptionUtil.isTransientError(rootTransientErrorThrowable)) {
259-
AirbyteTraceMessageUtility.emitTransientErrorTrace(
260-
e,
261-
ConnectorExceptionUtil.getDisplayMessage(rootTransientErrorThrowable)
262-
)
263-
// On receiving a transient error, the container should be immediately shut down.
264-
System.exit(1)
265-
}
266-
throw e
222+
exceptionHandler.handleException(e, parsed.command, outputRecordCollector)
267223
}
268-
269224
LOGGER.info { "Completed integration: ${integration.javaClass.name}" }
270225
}
271226

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
package io.airbyte.cdk.integrations.util
5+
6+
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
7+
import io.airbyte.cdk.integrations.base.Command
8+
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage
9+
import io.airbyte.commons.exceptions.ConfigErrorException
10+
import io.airbyte.commons.exceptions.ConnectionErrorException
11+
import io.airbyte.commons.exceptions.TransientErrorException
12+
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
13+
import io.airbyte.protocol.models.v0.AirbyteMessage
14+
import io.github.oshai.kotlinlogging.KotlinLogging
15+
import java.util.function.Consumer
16+
import java.util.regex.Pattern
17+
import java.util.regex.PatternSyntaxException
18+
import kotlin.system.exitProcess
19+
import org.jetbrains.annotations.VisibleForTesting
20+
21+
private val LOGGER = KotlinLogging.logger {}
22+
23+
enum class FailureType {
24+
CONFIG,
25+
TRANSIENT
26+
}
27+
28+
data class ConnectorErrorProfile(
29+
val errorClass: String,
30+
val regexMatchingPattern: String,
31+
val failureType: FailureType,
32+
val externalMessage: String,
33+
val sampleInternalMessage: String,
34+
val referenceLinks: List<String> = emptyList(),
35+
) {
36+
init {
37+
require(isValidRegex(regexMatchingPattern)) {
38+
"regexMatchingPattern is not a valid regular expression string"
39+
}
40+
require(externalMessage.isNotBlank()) { "externalMessage must not be blank" }
41+
require(sampleInternalMessage.isNotBlank()) { "sampleInternalMessage must not be blank" }
42+
}
43+
44+
private fun isValidRegex(regexString: String): Boolean {
45+
return try {
46+
Pattern.compile(regexString)
47+
true
48+
} catch (e: PatternSyntaxException) {
49+
false
50+
}
51+
}
52+
}
53+
54+
/**
55+
* This class defines interfaces that will be implemented by individual connectors for translating
56+
* internal exception error messages to external user-friendly error messages.
57+
*/
58+
open class ConnectorExceptionHandler {
59+
private val COMMON_EXCEPTION_MESSAGE_TEMPLATE: String =
60+
"Could not connect with provided configuration. Error: %s"
61+
62+
protected open val connectorErrorDictionary: MutableList<ConnectorErrorProfile> =
63+
mutableListOf()
64+
65+
init {
66+
initializeErrorDictionary()
67+
}
68+
69+
/**
70+
* Handles exceptions thrown by the connector. This method is the main entrance for handling
71+
* exceptions thrown by the connector. It checks if the exception is a known exception, and if
72+
* so, it emits the appropriate trace and external user-friendly error message. If the exception
73+
* is not known, it rethrows the exception, which becomes a system error.
74+
*/
75+
fun handleException(
76+
e: Throwable,
77+
cmd: Command,
78+
outputRecordCollector: Consumer<AirbyteMessage>
79+
) {
80+
LOGGER.error(e) { "caught exception!" }
81+
ApmTraceUtils.addExceptionToTrace(e)
82+
val rootException: Throwable = getRootException(e)
83+
val externalMessage: String? = getExternalMessage(rootException)
84+
/* error messages generated during check() needs special handling */
85+
if (cmd == Command.CHECK) {
86+
outputRecordCollector.accept(
87+
AirbyteMessage()
88+
.withType(AirbyteMessage.Type.CONNECTION_STATUS)
89+
.withConnectionStatus(
90+
AirbyteConnectionStatus()
91+
.withStatus(AirbyteConnectionStatus.Status.FAILED)
92+
.withMessage(externalMessage),
93+
),
94+
)
95+
} else {
96+
if (checkErrorType(rootException, FailureType.CONFIG)) {
97+
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, externalMessage)
98+
exitProcess(1)
99+
} else if (checkErrorType(rootException, FailureType.TRANSIENT)) {
100+
AirbyteTraceMessageUtility.emitTransientErrorTrace(e, externalMessage)
101+
exitProcess(1)
102+
}
103+
throw e
104+
}
105+
}
106+
107+
/**
108+
* Initializes the error dictionary for the connector. This method shall include all the errors
109+
* that are shared by all connectors.
110+
*/
111+
open fun initializeErrorDictionary() {}
112+
113+
/**
114+
* Translates an internal exception message to an external user-friendly message. This is the
115+
* main entrance of the error translation process.
116+
*/
117+
fun getExternalMessage(e: Throwable?): String? {
118+
// some common translations that every connector would share can be done here
119+
if (e is ConfigErrorException) {
120+
return e.displayMessage
121+
} else if (e is TransientErrorException) {
122+
return e.message
123+
} else if (e is ConnectionErrorException) {
124+
return ErrorMessage.getErrorMessage(e.stateCode, e.errorCode, e.exceptionMessage, e)
125+
} else {
126+
val msg = translateConnectorSpecificErrorMessage(e)
127+
if (msg != null) return msg
128+
}
129+
// if no specific translation is found, return a generic message
130+
return String.format(
131+
COMMON_EXCEPTION_MESSAGE_TEMPLATE,
132+
if (e!!.message != null) e.message else "",
133+
)
134+
}
135+
136+
fun add(errorProfile: ConnectorErrorProfile) {
137+
connectorErrorDictionary.add(errorProfile)
138+
}
139+
140+
/**
141+
* Translates a connector specific error message to an external user-friendly message. This
142+
* method should be implemented by individual connectors that wish to translate connector
143+
* specific error messages.
144+
*/
145+
open fun translateConnectorSpecificErrorMessage(e: Throwable?): String? {
146+
if (e == null) return null
147+
for (error in connectorErrorDictionary) {
148+
if (e.message?.lowercase()?.matches(error.regexMatchingPattern.toRegex())!!)
149+
return error.externalMessage
150+
}
151+
return null
152+
}
153+
154+
/**
155+
* Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An attempt is
156+
* made to find the root exception that corresponds to a configuration error. If that does not
157+
* exist, we just return the original exception.
158+
*/
159+
@VisibleForTesting
160+
internal fun getRootException(e: Throwable): Throwable {
161+
var current: Throwable? = e
162+
while (current != null) {
163+
if (isRecognizableError(current)) {
164+
return current
165+
} else {
166+
current = current.cause
167+
}
168+
}
169+
return e
170+
}
171+
172+
private fun checkErrorType(e: Throwable?, failureType: FailureType?): Boolean {
173+
for (error in connectorErrorDictionary) {
174+
if (
175+
error.failureType == failureType &&
176+
e!!.message?.matches(error.regexMatchingPattern.toRegex())!!
177+
)
178+
return true
179+
}
180+
return false
181+
}
182+
183+
/*
184+
* Checks if the error can be recognized. A recognizable error is either
185+
* a known transient exception, a config exception, or an exception whose error messages have been
186+
* stored as part of the error profile in the error dictionary.
187+
* */
188+
private fun isRecognizableError(e: Throwable?): Boolean {
189+
if (e == null) return false
190+
if (e is TransientErrorException || e is ConfigErrorException) {
191+
return true
192+
}
193+
for (error in connectorErrorDictionary) {
194+
if (e.message?.matches(error.regexMatchingPattern.toRegex())!!) return true
195+
}
196+
return false
197+
}
198+
}

0 commit comments

Comments
 (0)