Skip to content

Commit 34e4fe0

Browse files
destination-postgres: implement refreshes (#41954)
implementing refreshes for destination-postgres we're bumping the CDK version to the latest, and modifying a whole lot of jsonl files for tests (in both regular and strict-encrypt)
1 parent 45410b4 commit 34e4fe0

File tree

87 files changed

+868
-316
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+868
-316
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+7
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,13 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.43.0 | 2024-07-17 | [\#41954](https://github.com/airbytehq/airbyte/pull/41954) | fix refreshes for connectors using the old SqlOperations |
178+
| 0.43.0 | 2024-07-17 | [\#42017](https://github.com/airbytehq/airbyte/pull/42017) | bump postgres-jdbc version |
179+
| 0.43.0 | 2024-07-17 | [\#42015](https://github.com/airbytehq/airbyte/pull/42015) | wait until migration before creating the Writeconfig objects |
180+
| 0.43.0 | 2024-07-17 | [\#41953](https://github.com/airbytehq/airbyte/pull/41953) | add generationId and syncId to SqlOperations functions |
181+
| 0.43.0 | 2024-07-17 | [\#41952](https://github.com/airbytehq/airbyte/pull/41952) | rename and add fields in WriteConfig |
182+
| 0.43.0 | 2024-07-17 | [\#41951](https://github.com/airbytehq/airbyte/pull/41951) | remove nullables in JdbcBufferedConsumerFactory |
183+
| 0.43.0 | 2024-07-17 | [\#41950](https://github.com/airbytehq/airbyte/pull/41950) | remove unused classes|
177184
| 0.42.2 | 2024-07-21 | [\#42122](https://github.com/airbytehq/airbyte/pull/42122) | Support for Debezium resync and shutdown scenarios. |
178185
| 0.42.2 | 2024-07-04 | [\#40208](https://github.com/airbytehq/airbyte/pull/40208) | Implement a new connector error handling and translation framework |
179186
| 0.41.8 | 2024-07-18 | [\#42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics message for WASS occurrence. |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt

+20-2
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,27 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
3333
@Throws(SQLException::class)
3434
abstract fun execute(query: CheckedConsumer<Connection, SQLException>)
3535

36-
@Throws(SQLException::class)
36+
/**
37+
* We can't define a default parameter in the method below because "An overriding function is
38+
* not allowed to specify default values for its parameters" in kotlin And the interface could
39+
* have a default parameter, but is not allowed an @JvmOverload because it's abstract. So for
40+
* java compat, we have 2 functions, the same way we would in java
41+
*/
3742
override fun execute(sql: String?) {
38-
execute { connection: Connection -> connection.createStatement().execute(sql) }
43+
execute(sql, true)
44+
}
45+
46+
@Throws(SQLException::class)
47+
fun execute(sql: String?, logStatements: Boolean) {
48+
execute { connection: Connection ->
49+
if (logStatements) {
50+
LOGGER.info("executing statement: $sql")
51+
}
52+
connection.createStatement().execute(sql)
53+
if (logStatements) {
54+
LOGGER.info("statement successfully executed")
55+
}
56+
}
3957
}
4058

4159
@Throws(SQLException::class)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt

+17-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ interface SqlOperations {
117117
database: JdbcDatabase?,
118118
schemaName: String?,
119119
sourceTableName: String?,
120-
destinationTableName: String?
120+
destinationTableName: String?,
121121
): String?
122122

123123
/**
@@ -132,6 +132,22 @@ interface SqlOperations {
132132
/** Check if the data record is valid and ok to be written to destination */
133133
fun isValidData(data: JsonNode?): Boolean
134134

135+
/**
136+
* check if there's any row in table {@code rawNamespace.rawName} for which the value of the
137+
* _airbyte_generation_id column is different from {@code generationId}
138+
*
139+
* @returns true if the table exists and contains such a row, false otherwise
140+
*/
141+
fun isOtherGenerationIdInTable(
142+
database: JdbcDatabase,
143+
generationId: Long,
144+
rawNamespace: String,
145+
rawName: String
146+
): Boolean
147+
148+
/** overwrite the raw table with the temporary raw table */
149+
fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String)
150+
135151
/**
136152
* Denotes whether the destination has the concept of schema or not
137153
*
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.42.4
1+
version=0.43.0

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt

+69-12
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
2020
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
2121
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter
2222
import io.airbyte.commons.json.Jsons
23+
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
2324
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
2425
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2526
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
@@ -81,8 +82,9 @@ object JdbcBufferedConsumerFactory {
8182
namingResolver,
8283
parsedCatalog
8384
),
84-
onCloseFunction(typerDeduper),
85+
onCloseFunction(database, sqlOperations, parsedCatalog, typerDeduper),
8586
JdbcInsertFlushFunction(
87+
defaultNamespace,
8688
recordWriterFunction(database, sqlOperations, writeConfigs, catalog),
8789
optimalBatchSizeBytes
8890
),
@@ -95,12 +97,28 @@ object JdbcBufferedConsumerFactory {
9597
}
9698

9799
private fun createWriteConfigs(
100+
database: JdbcDatabase,
101+
sqlOperations: SqlOperations,
98102
namingResolver: NamingConventionTransformer,
99103
parsedCatalog: ParsedCatalog,
100104
): List<WriteConfig> {
101-
return parsedCatalog.streams
102-
.map { parsedStreamToWriteConfig(namingResolver, rawTableSuffix = "").apply(it) }
103-
.toList()
105+
return parsedCatalog.streams.map {
106+
val rawSuffix: String =
107+
if (
108+
it.minimumGenerationId == it.generationId &&
109+
sqlOperations.isOtherGenerationIdInTable(
110+
database,
111+
it.generationId,
112+
it.id.rawNamespace,
113+
it.id.rawName
114+
)
115+
) {
116+
AbstractStreamOperation.TMP_TABLE_SUFFIX
117+
} else {
118+
AbstractStreamOperation.NO_SUFFIX
119+
}
120+
parsedStreamToWriteConfig(namingResolver, rawSuffix).apply(it)
121+
}
104122
}
105123

106124
private fun parsedStreamToWriteConfig(
@@ -124,7 +142,7 @@ object JdbcBufferedConsumerFactory {
124142
streamConfig.syncId,
125143
streamConfig.generationId,
126144
streamConfig.minimumGenerationId,
127-
rawTableSuffix
145+
rawTableSuffix,
128146
)
129147
}
130148
}
@@ -152,7 +170,9 @@ object JdbcBufferedConsumerFactory {
152170
): OnStartFunction {
153171
return OnStartFunction {
154172
typerDeduper.prepareSchemasAndRunMigrations()
155-
writeConfigs.addAll(createWriteConfigs(namingResolver, parsedCatalog))
173+
writeConfigs.addAll(
174+
createWriteConfigs(database, sqlOperations, namingResolver, parsedCatalog)
175+
)
156176
LOGGER.info {
157177
"Preparing raw tables in destination started for ${writeConfigs.size} streams"
158178
}
@@ -165,15 +185,30 @@ object JdbcBufferedConsumerFactory {
165185
}
166186
sqlOperations.createSchemaIfNotExists(database, schemaName)
167187
sqlOperations.createTableIfNotExists(database, schemaName, dstTableName)
188+
// if rawSuffix is empty, this is a no-op
189+
sqlOperations.createTableIfNotExists(
190+
database,
191+
schemaName,
192+
dstTableName + writeConfig.rawTableSuffix
193+
)
168194
when (writeConfig.minimumGenerationId) {
169195
writeConfig.generationId ->
170-
queryList.add(
171-
sqlOperations.truncateTableQuery(
196+
if (
197+
sqlOperations.isOtherGenerationIdInTable(
172198
database,
199+
writeConfig.generationId,
173200
schemaName,
174-
dstTableName,
201+
dstTableName + writeConfig.rawTableSuffix
175202
)
176-
)
203+
) {
204+
queryList.add(
205+
sqlOperations.truncateTableQuery(
206+
database,
207+
schemaName,
208+
dstTableName + writeConfig.rawTableSuffix,
209+
)
210+
)
211+
}
177212
0L -> {}
178213
else ->
179214
throw IllegalStateException(
@@ -224,19 +259,41 @@ object JdbcBufferedConsumerFactory {
224259
database,
225260
ArrayList(records),
226261
writeConfig.rawNamespace,
227-
writeConfig.rawTableName,
262+
writeConfig.rawTableName + writeConfig.rawTableSuffix,
228263
writeConfig.syncId,
229264
writeConfig.generationId,
230265
)
231266
}
232267
}
233268

234269
/** Tear down functionality */
235-
private fun onCloseFunction(typerDeduper: TyperDeduper): OnCloseFunction {
270+
private fun onCloseFunction(
271+
database: JdbcDatabase,
272+
sqlOperations: SqlOperations,
273+
catalog: ParsedCatalog,
274+
typerDeduper: TyperDeduper
275+
): OnCloseFunction {
236276
return OnCloseFunction {
237277
_: Boolean,
238278
streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary> ->
239279
try {
280+
catalog.streams.forEach {
281+
if (
282+
it.minimumGenerationId == it.generationId &&
283+
sqlOperations.isOtherGenerationIdInTable(
284+
database,
285+
it.generationId,
286+
it.id.rawNamespace,
287+
it.id.rawName
288+
) &&
289+
streamSyncSummaries
290+
.getValue(it.id.asStreamDescriptor())
291+
.terminalStatus ==
292+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE
293+
) {
294+
sqlOperations.overwriteRawTable(database, it.id.rawNamespace, it.id.rawName)
295+
}
296+
}
240297
typerDeduper.typeAndDedupe(streamSyncSummaries)
241298
typerDeduper.commitFinalTables()
242299
typerDeduper.cleanup()

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
1111
import java.util.stream.Stream
1212

1313
class JdbcInsertFlushFunction(
14+
private val defaultNamespace: String,
1415
private val recordWriter: RecordWriter<PartialAirbyteMessage>,
1516
override val optimalBatchSizeBytes: Long
1617
) : DestinationFlushFunction {
1718
@Throws(Exception::class)
1819
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
1920
recordWriter.accept(
20-
AirbyteStreamNameNamespacePair(streamDescriptor.name, streamDescriptor.namespace),
21+
AirbyteStreamNameNamespacePair(
22+
streamDescriptor.name,
23+
streamDescriptor.namespace ?: defaultNamespace
24+
),
2125
stream.toList()
2226
)
2327
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt

+28-19
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ abstract class JdbcSqlOperations : SqlOperations {
120120
%s JSONB,
121121
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
122122
%s TIMESTAMP WITH TIME ZONE DEFAULT NULL,
123-
%s JSONB
123+
%s JSONB,
124+
%s BIGINT
124125
);
125126
126127
""".trimIndent(),
@@ -130,14 +131,20 @@ abstract class JdbcSqlOperations : SqlOperations {
130131
JavaBaseConstants.COLUMN_NAME_DATA,
131132
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
132133
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
133-
JavaBaseConstants.COLUMN_NAME_AB_META
134+
JavaBaseConstants.COLUMN_NAME_AB_META,
135+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
134136
)
135137
}
136138

137139
// TODO: This method seems to be used by Postgres and others while staging to local temp files.
138140
// Should there be a Local staging operations equivalent
139141
@Throws(Exception::class)
140-
protected fun writeBatchToFile(tmpFile: File?, records: List<PartialAirbyteMessage>) {
142+
protected fun writeBatchToFile(
143+
tmpFile: File?,
144+
records: List<PartialAirbyteMessage>,
145+
syncId: Long,
146+
generationId: Long
147+
) {
141148
PrintWriter(tmpFile, StandardCharsets.UTF_8).use { writer ->
142149
CSVPrinter(writer, CSVFormat.DEFAULT).use { csvPrinter ->
143150
for (record in records) {
@@ -146,14 +153,28 @@ abstract class JdbcSqlOperations : SqlOperations {
146153
val jsonData = record.serialized
147154
val airbyteMeta =
148155
if (record.record!!.meta == null) {
149-
"{\"changes\":[]}"
156+
"""{"changes":[],${JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY}":$syncId}"""
150157
} else {
151-
Jsons.serialize(record.record!!.meta)
158+
Jsons.serialize(
159+
record.record!!
160+
.meta!!
161+
.withAdditionalProperty(
162+
JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY,
163+
syncId,
164+
)
165+
)
152166
}
153167
val extractedAt =
154168
Timestamp.from(Instant.ofEpochMilli(record.record!!.emittedAt))
155169
if (isDestinationV2) {
156-
csvPrinter.printRecord(uuid, jsonData, extractedAt, null, airbyteMeta)
170+
csvPrinter.printRecord(
171+
uuid,
172+
jsonData,
173+
extractedAt,
174+
null,
175+
airbyteMeta,
176+
generationId
177+
)
157178
} else {
158179
csvPrinter.printRecord(uuid, jsonData, extractedAt)
159180
}
@@ -229,21 +250,9 @@ abstract class JdbcSqlOperations : SqlOperations {
229250
syncId: Long,
230251
generationId: Long
231252
) {
232-
if (isDestinationV2) {
233-
insertRecordsInternalV2(database, records, schemaName, tableName, syncId, generationId)
234-
} else {
235-
insertRecordsInternal(database, records, schemaName, tableName)
236-
}
253+
insertRecordsInternalV2(database, records, schemaName, tableName, syncId, generationId)
237254
}
238255

239-
@Throws(Exception::class)
240-
protected abstract fun insertRecordsInternal(
241-
database: JdbcDatabase,
242-
records: List<PartialAirbyteMessage>,
243-
schemaName: String?,
244-
tableName: String?
245-
)
246-
247256
@Throws(Exception::class)
248257
protected abstract fun insertRecordsInternalV2(
249258
database: JdbcDatabase,

airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt

+13-9
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,28 @@ import org.mockito.Mockito
1212

1313
class TestJdbcSqlOperations : JdbcSqlOperations() {
1414
@Throws(Exception::class)
15-
public override fun insertRecordsInternal(
15+
override fun insertRecordsInternalV2(
1616
database: JdbcDatabase,
1717
records: List<PartialAirbyteMessage>,
1818
schemaName: String?,
19-
tableName: String?
19+
tableName: String?,
20+
syncId: Long,
21+
generationId: Long,
2022
) {
2123
// Not required for the testing
2224
}
2325

24-
@Throws(Exception::class)
25-
override fun insertRecordsInternalV2(
26+
override fun isOtherGenerationIdInTable(
2627
database: JdbcDatabase,
27-
records: List<PartialAirbyteMessage>,
28-
schemaName: String?,
29-
tableName: String?,
30-
syncId: Long,
3128
generationId: Long,
32-
) {
29+
rawNamespace: String,
30+
rawName: String
31+
): Boolean {
32+
return false
33+
// Not required for the testing
34+
}
35+
36+
override fun overwriteRawTable(database: JdbcDatabase, rawNamespace: String, rawName: String) {
3337
// Not required for the testing
3438
}
3539

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
331331
}
332332

333333
companion object {
334-
private const val NO_SUFFIX = ""
334+
const val NO_SUFFIX = ""
335335
const val TMP_TABLE_SUFFIX = "_airbyte_tmp"
336336
}
337337
}

0 commit comments

Comments
 (0)