Skip to content

Commit bd6aa0f

Browse files
gisripaxiaohansong
authored andcommitted
Destinations CDK: Refactor T+D to gather required world state upfront (#35342)
Signed-off-by: Gireesh Sreepathi <[email protected]>
1 parent f1e46c0 commit bd6aa0f

File tree

36 files changed

+682
-483
lines changed

36 files changed

+682
-483
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+96-95
Large diffs are not rendered by default.

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.java

+12
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage;
1111
import io.airbyte.commons.exceptions.ConfigErrorException;
1212
import io.airbyte.commons.exceptions.ConnectionErrorException;
13+
import io.airbyte.commons.functional.Either;
1314
import java.sql.SQLException;
1415
import java.sql.SQLSyntaxErrorException;
1516
import java.util.Collection;
@@ -85,6 +86,17 @@ public static <T extends Throwable> void logAllAndThrowFirst(final String initia
8586
}
8687
}
8788

89+
public static <T extends Throwable, Result> List<Result> getResultsOrLogAndThrowFirst(final String initialMessage,
90+
final List<Either<? extends T, Result>> eithers)
91+
throws T {
92+
List<? extends T> throwables = eithers.stream().filter(Either::isLeft).map(Either::getLeft).toList();
93+
if (!throwables.isEmpty()) {
94+
logAllAndThrowFirst(initialMessage, throwables);
95+
}
96+
// No need to filter on isRight since isLeft will throw before reaching this line.
97+
return eithers.stream().map(Either::getRight).toList();
98+
}
99+
88100
private static boolean isConfigErrorException(Throwable e) {
89101
return e instanceof ConfigErrorException;
90102
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.22.1
1+
version=0.23.0

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,7 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri
252252

253253
protected abstract JdbcSqlGenerator getSqlGenerator();
254254

255-
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
256-
return new JdbcDestinationHandler(databaseName, database);
257-
}
255+
protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database);
258256

259257
/**
260258
* "database" key at root of the config json, for any other variants in config, override this
@@ -318,14 +316,14 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
318316
final String databaseName = getDatabaseName(config);
319317
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
320318
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
321-
final DestinationHandler<TableDefinition> destinationHandler = getDestinationHandler(databaseName, database);
319+
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
322320
final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
323321
final TyperDeduper typerDeduper;
324322
if (disableTypeDedupe) {
325-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
323+
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
326324
} else {
327325
typerDeduper =
328-
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
326+
new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
329327
}
330328
return typerDeduper;
331329
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/ColumnDefinition.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@
44

55
package io.airbyte.cdk.integrations.destination.jdbc;
66

7-
import java.sql.SQLType;
8-
9-
public record ColumnDefinition(String name, String type, SQLType sqlType, int columnSize) {
7+
/**
8+
* Jdbc destination column definition representation
9+
*
10+
* @param name
11+
* @param type
12+
* @param columnSize
13+
*/
14+
public record ColumnDefinition(String name, String type, int columnSize, boolean isNullable) {
1015

1116
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/CustomSqlType.java

-33
This file was deleted.

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.LinkedHashMap;
88

99
/**
10-
* Jdbc destination table definition representation
10+
* Jdbc destination table definition representation with a map of column names to column definitions
1111
*
1212
* @param columns
1313
*/

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TableSchemaRecordSet.java

-9
This file was deleted.

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/TypeInfoRecordSet.java

-71
This file was deleted.

0 commit comments

Comments
 (0)