Skip to content

Commit cb1bfbe

Browse files
simple split of DestinationAcceptanceTest (#46689)
just a split of `DestinationAcceptanceTest` so we can have file-based tests and record-based tests. The change was made in such a way that all existing subclass of `DestinationAcceptanceTest` will still work as-is. This Pr was simply split to isolate those changes from functional changes
1 parent dc2c75c commit cb1bfbe

File tree

4 files changed

+296
-225
lines changed

4 files changed

+296
-225
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest|
177178
| 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty|
178179
| 0.47.1 | 2024-09-27 | [\#45397](https://github.com/airbytehq/airbyte/pull/45397) | Allow logical replication from Postgres 16 read-replicas|
179180
| 0.47.0 | 2024-09-26 | [\#42030](https://github.com/airbytehq/airbyte/pull/42030) | minor refactor |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.47.2
1+
version=0.47.3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.standardtest.destination
6+
7+
import com.fasterxml.jackson.databind.JsonNode
8+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
9+
import io.airbyte.commons.features.EnvVariableFeatureFlags
10+
import io.airbyte.commons.features.FeatureFlags
11+
import io.airbyte.commons.features.FeatureFlagsWrapper
12+
import io.airbyte.commons.lang.Exceptions
13+
import io.airbyte.configoss.WorkerDestinationConfig
14+
import io.airbyte.protocol.models.v0.AirbyteMessage
15+
import io.airbyte.protocol.models.v0.AirbyteStateStats
16+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
17+
import io.airbyte.workers.helper.ConnectorConfigUpdater
18+
import io.airbyte.workers.internal.AirbyteDestination
19+
import io.airbyte.workers.internal.DefaultAirbyteDestination
20+
import io.airbyte.workers.process.AirbyteIntegrationLauncher
21+
import io.airbyte.workers.process.DockerProcessFactory
22+
import io.airbyte.workers.process.ProcessFactory
23+
import io.github.oshai.kotlinlogging.KotlinLogging
24+
import java.nio.file.Files
25+
import java.nio.file.Path
26+
import java.util.*
27+
import java.util.function.Consumer
28+
import org.junit.jupiter.api.Assertions
29+
import org.junit.jupiter.api.BeforeEach
30+
import org.mockito.Mockito
31+
32+
private val LOGGER = KotlinLogging.logger {}
33+
34+
@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION")
35+
abstract class BaseDestinationAcceptanceTest(
36+
// If false, ignore counts and only verify the final state message.
37+
protected val verifyIndividualStateAndCounts: Boolean = false,
38+
) {
39+
protected lateinit var processFactory: ProcessFactory
40+
private set
41+
protected lateinit var jobRoot: Path
42+
private set
43+
protected var localRoot: Path? = null
44+
private set
45+
protected lateinit var testEnv: DestinationAcceptanceTest.TestDestinationEnv
46+
private set
47+
protected var fileTransferMountSource: Path? = null
48+
private set
49+
protected open val supportsFileTransfer: Boolean = false
50+
protected var testSchemas: HashSet<String> = HashSet()
51+
protected lateinit var mConnectorConfigUpdater: ConnectorConfigUpdater
52+
private set
53+
protected open val isCloudTest: Boolean = true
54+
protected val featureFlags: FeatureFlags =
55+
if (isCloudTest) {
56+
FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "CLOUD")
57+
} else {
58+
FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "OSS")
59+
}
60+
protected abstract val imageName: String
61+
/**
62+
* Name of the docker image that the tests will run against.
63+
*
64+
* @return docker image name
65+
*/
66+
get
67+
68+
/**
69+
* Configuration specific to the integration. Will be passed to integration where appropriate in
70+
* each test. Should be valid.
71+
*
72+
* @return integration-specific configuration
73+
*/
74+
@Throws(Exception::class) protected abstract fun getConfig(): JsonNode
75+
76+
protected open fun supportsInDestinationNormalization(): Boolean {
77+
return false
78+
}
79+
80+
protected fun inDestinationNormalizationFlags(shouldNormalize: Boolean): Map<String, String> {
81+
if (shouldNormalize && supportsInDestinationNormalization()) {
82+
return java.util.Map.of("NORMALIZATION_TECHNIQUE", "LEGACY")
83+
}
84+
return emptyMap()
85+
}
86+
87+
protected fun getDestinationConfig(
88+
config: JsonNode,
89+
catalog: ConfiguredAirbyteCatalog,
90+
): WorkerDestinationConfig {
91+
return WorkerDestinationConfig()
92+
.withConnectionId(UUID.randomUUID())
93+
.withCatalog(
94+
DestinationAcceptanceTest.convertProtocolObject(
95+
catalog,
96+
io.airbyte.protocol.models.ConfiguredAirbyteCatalog::class.java
97+
)
98+
)
99+
.withDestinationConnectionConfiguration(config)
100+
}
101+
102+
protected fun runSyncAndVerifyStateOutput(
103+
config: JsonNode,
104+
messages: List<AirbyteMessage>,
105+
catalog: ConfiguredAirbyteCatalog,
106+
runNormalization: Boolean,
107+
) {
108+
runSyncAndVerifyStateOutput(
109+
config,
110+
messages,
111+
catalog,
112+
runNormalization,
113+
imageName,
114+
verifyIndividualStateAndCounts
115+
)
116+
}
117+
118+
@Throws(Exception::class)
119+
protected fun runSyncAndVerifyStateOutput(
120+
config: JsonNode,
121+
messages: List<AirbyteMessage>,
122+
catalog: ConfiguredAirbyteCatalog,
123+
runNormalization: Boolean,
124+
imageName: String,
125+
verifyIndividualStateAndCounts: Boolean
126+
) {
127+
val destinationOutput = runSync(config, messages, catalog, runNormalization, imageName)
128+
129+
var expected = messages.filter { it.type == AirbyteMessage.Type.STATE }
130+
var actual = destinationOutput.filter { it.type == AirbyteMessage.Type.STATE }
131+
132+
if (verifyIndividualStateAndCounts) {
133+
/* Collect the counts and add them to each expected state message */
134+
val stateToCount = mutableMapOf<JsonNode, Int>()
135+
messages.fold(0) { acc, message ->
136+
if (message.type == AirbyteMessage.Type.STATE) {
137+
stateToCount[message.state.global.sharedState] = acc
138+
0
139+
} else {
140+
acc + 1
141+
}
142+
}
143+
144+
expected.forEach { message ->
145+
val clone = message.state
146+
clone.destinationStats =
147+
AirbyteStateStats()
148+
.withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble())
149+
message.state = clone
150+
}
151+
} else {
152+
/* Null the stats and collect only the final messages */
153+
val finalActual =
154+
actual.lastOrNull()
155+
?: throw IllegalArgumentException(
156+
"All message sets used for testing should include a state record"
157+
)
158+
val clone = finalActual.state
159+
clone.destinationStats = null
160+
finalActual.state = clone
161+
162+
expected = listOf(expected.last())
163+
actual = listOf(finalActual)
164+
}
165+
166+
Assertions.assertEquals(expected, actual)
167+
}
168+
169+
@Throws(Exception::class)
170+
open protected fun runSync(
171+
config: JsonNode,
172+
messages: List<AirbyteMessage>,
173+
catalog: ConfiguredAirbyteCatalog,
174+
runNormalization: Boolean,
175+
imageName: String,
176+
): List<AirbyteMessage> {
177+
val destinationConfig = getDestinationConfig(config, catalog)
178+
return runSync(messages, runNormalization, imageName, destinationConfig)
179+
}
180+
181+
@Throws(Exception::class)
182+
protected fun runSync(
183+
messages: List<AirbyteMessage>,
184+
runNormalization: Boolean,
185+
imageName: String,
186+
destinationConfig: WorkerDestinationConfig,
187+
): List<AirbyteMessage> {
188+
val destination = getDestination(imageName)
189+
190+
destination.start(
191+
destinationConfig,
192+
jobRoot,
193+
inDestinationNormalizationFlags(runNormalization)
194+
)
195+
messages.forEach(
196+
Consumer { message: AirbyteMessage ->
197+
Exceptions.toRuntime {
198+
destination.accept(
199+
DestinationAcceptanceTest.convertProtocolObject(
200+
message,
201+
io.airbyte.protocol.models.AirbyteMessage::class.java
202+
)
203+
)
204+
}
205+
}
206+
)
207+
destination.notifyEndOfInput()
208+
209+
val destinationOutput: MutableList<AirbyteMessage> = ArrayList()
210+
while (!destination.isFinished()) {
211+
destination.attemptRead().ifPresent {
212+
destinationOutput.add(
213+
DestinationAcceptanceTest.convertProtocolObject(it, AirbyteMessage::class.java)
214+
)
215+
}
216+
}
217+
218+
destination.close()
219+
220+
return destinationOutput
221+
}
222+
223+
protected fun getDestination(imageName: String): AirbyteDestination {
224+
return DefaultAirbyteDestination(
225+
integrationLauncher =
226+
AirbyteIntegrationLauncher(
227+
DestinationAcceptanceTest.JOB_ID,
228+
DestinationAcceptanceTest.JOB_ATTEMPT,
229+
imageName,
230+
processFactory,
231+
null,
232+
null,
233+
false,
234+
featureFlags
235+
)
236+
)
237+
}
238+
239+
@BeforeEach
240+
@Throws(Exception::class)
241+
open fun setUpInternal() {
242+
val testDir = Path.of("/tmp/airbyte_tests/")
243+
Files.createDirectories(testDir)
244+
val workspaceRoot = Files.createTempDirectory(testDir, "test")
245+
jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job"))
246+
localRoot = Files.createTempDirectory(testDir, "output")
247+
LOGGER.info { "${"jobRoot: {}"} $jobRoot" }
248+
LOGGER.info { "${"localRoot: {}"} $localRoot" }
249+
testEnv = DestinationAcceptanceTest.TestDestinationEnv(localRoot)
250+
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java)
251+
testSchemas = HashSet()
252+
setup(testEnv, testSchemas)
253+
fileTransferMountSource =
254+
if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null
255+
256+
processFactory =
257+
DockerProcessFactory(
258+
workspaceRoot,
259+
workspaceRoot.toString(),
260+
localRoot.toString(),
261+
"host",
262+
getConnectorEnv()
263+
)
264+
}
265+
266+
/**
267+
* Function that performs any setup of external resources required for the test. e.g.
268+
* instantiate a postgres database. This function will be called before EACH test.
269+
*
270+
* @param testEnv
271+
* - information about the test environment.
272+
* @param TEST_SCHEMAS
273+
* @throws Exception
274+
* - can throw any exception, test framework will handle.
275+
*/
276+
@Throws(Exception::class)
277+
protected abstract fun setup(
278+
testEnv: DestinationAcceptanceTest.TestDestinationEnv,
279+
TEST_SCHEMAS: HashSet<String>
280+
)
281+
282+
open fun getConnectorEnv(): Map<String, String> {
283+
return emptyMap()
284+
}
285+
}

0 commit comments

Comments
 (0)