Skip to content

Commit cf0bbee

Browse files
gisripajatinyadav-cc
authored andcommitted
Destination Redshift: CDK T+D initial state refactor (airbytehq#35354)
Signed-off-by: Gireesh Sreepathi <[email protected]>
1 parent ca29b4c commit cf0bbee

File tree

10 files changed

+145
-203
lines changed

10 files changed

+145
-203
lines changed

airbyte-integrations/connectors/destination-redshift/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.20.0'
7+
cdkVersionRequired = '0.23.2'
88
features = ['db-destinations', 's3-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}

airbyte-integrations/connectors/destination-redshift/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
8-
dockerImageTag: 2.1.7
8+
dockerImageTag: 2.1.8
99
dockerRepository: airbyte/destination-redshift
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
1111
githubIssueLabel: destination-redshift

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
228228
final JdbcV1V2Migrator migrator = new JdbcV1V2Migrator(getNamingResolver(), database, databaseName);
229229
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
230230
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
231-
final int defaultThreadCount = 8;
232231
if (disableTypeDedupe) {
233-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator,
234-
defaultThreadCount);
232+
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
235233
} else {
236234
typerDeduper =
237-
new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount);
235+
new DefaultTyperDeduper(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
238236
}
239237
return StagingConsumerFactory.builder(
240238
outputRecordCollector,

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftDestinationHandler.java

+37-32
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,17 @@
44

55
package io.airbyte.integrations.destination.redshift.typing_deduping;
66

7-
import com.fasterxml.jackson.databind.JsonNode;
7+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.*;
8+
89
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
910
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
11+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
12+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
13+
import io.airbyte.integrations.base.destination.typing_deduping.Array;
1014
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
11-
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
15+
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
16+
import io.airbyte.integrations.base.destination.typing_deduping.Union;
17+
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
1218
import java.sql.SQLException;
1319
import java.util.ArrayList;
1420
import java.util.List;
@@ -49,37 +55,36 @@ public void execute(final Sql sql) throws Exception {
4955
}
5056
}
5157

52-
/**
53-
* Issuing a select 1 limit 1 query can be expensive, so relying on SVV_TABLE_INFO system table.
54-
* EXPLAIN of the select 1 from table limit 1 query: (seq scan and then limit is applied, read from
55-
* bottom to top) XN Lim it (co st=0. 0 .0.01 rows=1 width=0) -> XN Seq Scan on _airbyte_raw_ users
56-
* (cost=0.00..1000.00 rows=100000 width=0)
57-
*
58-
* @param id
59-
* @return
60-
* @throws Exception
61-
*/
6258
@Override
63-
public boolean isFinalTableEmpty(final StreamId id) throws Exception {
64-
// Redshift doesn't have an information_schema.tables table, so we have to use SVV_TABLE_INFO.
65-
// From https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_TABLE_INFO.html:
66-
// > The SVV_TABLE_INFO view doesn't return any information for empty tables.
67-
// So we just query for our specific table, and if we get no rows back,
68-
// then we assume the table is empty.
69-
// Note that because the column names are reserved words (table, schema, database),
70-
// we need to enquote them.
71-
final List<JsonNode> query = jdbcDatabase.queryJsons(
72-
"""
73-
SELECT 1
74-
FROM SVV_TABLE_INFO
75-
WHERE "database" = ?
76-
AND "schema" = ?
77-
AND "table" = ?
78-
""",
79-
databaseName,
80-
id.finalNamespace(),
81-
id.finalName());
82-
return query.isEmpty();
59+
protected String toJdbcTypeName(AirbyteType airbyteType) {
60+
// This is mostly identical to the postgres implementation, but swaps jsonb to super
61+
if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) {
62+
return toJdbcTypeName(airbyteProtocolType);
63+
}
64+
return switch (airbyteType.getTypeName()) {
65+
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "super";
66+
// No nested Unions supported so this will definitely not result in infinite recursion.
67+
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
68+
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType);
69+
};
8370
}
8471

72+
private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
73+
return switch (airbyteProtocolType) {
74+
case STRING -> "varchar";
75+
case NUMBER -> "numeric";
76+
case INTEGER -> "int8";
77+
case BOOLEAN -> "bool";
78+
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz";
79+
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp";
80+
case TIME_WITH_TIMEZONE -> "timetz";
81+
case TIME_WITHOUT_TIMEZONE -> "time";
82+
case DATE -> "date";
83+
case UNKNOWN -> "super";
84+
};
85+
}
86+
87+
// Do not use SVV_TABLE_INFO to get isFinalTableEmpty.
88+
// See https://github.com/airbytehq/airbyte/issues/34357
89+
8590
}

airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGenerator.java

-39
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,19 @@
1616
import static org.jooq.impl.DSL.rowNumber;
1717
import static org.jooq.impl.DSL.val;
1818

19-
import com.google.common.collect.ImmutableMap;
20-
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
2119
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
22-
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
2320
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2421
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
2522
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
2623
import io.airbyte.integrations.base.destination.typing_deduping.Array;
2724
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
28-
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
2925
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
3026
import io.airbyte.integrations.base.destination.typing_deduping.Union;
3127
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
3228
import java.sql.Timestamp;
3329
import java.util.ArrayList;
3430
import java.util.LinkedHashMap;
3531
import java.util.List;
36-
import java.util.Map;
3732
import java.util.Optional;
3833
import java.util.stream.Collectors;
3934
import org.jooq.Condition;
@@ -47,12 +42,6 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
4742

4843
public static final String CASE_STATEMENT_SQL_TEMPLATE = "CASE WHEN {0} THEN {1} ELSE {2} END ";
4944
public static final String CASE_STATEMENT_NO_ELSE_SQL_TEMPLATE = "CASE WHEN {0} THEN {1} END ";
50-
private static final Map<String, String> REDSHIFT_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of(
51-
"numeric", "decimal",
52-
"int8", "bigint",
53-
"bool", "boolean",
54-
"timestamptz", "timestamp with time zone",
55-
"timetz", "time with time zone");
5645
private static final String COLUMN_ERROR_MESSAGE_FORMAT = "Problem with `%s`";
5746
private static final String AIRBYTE_META_COLUMN_ERRORS_KEY = "errors";
5847

@@ -168,7 +157,6 @@ Field<?> arrayConcatStmt(final List<Field<?>> arrays) {
168157
}
169158

170159
Field<?> result = arrays.get(0);
171-
String renderedSql = getDslContext().render(result);
172160
for (int i = 1; i < arrays.size(); i++) {
173161
// We lose some nice indentation but thats ok. Queryparts
174162
// are intentionally rendered here to avoid deep stack for function sql rendering.
@@ -199,29 +187,6 @@ protected Field<?> buildAirbyteMetaColumn(final LinkedHashMap<ColumnId, AirbyteT
199187

200188
}
201189

202-
@Override
203-
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
204-
// Check that the columns match, with special handling for the metadata columns.
205-
// This is mostly identical to the redshift implementation, but swaps jsonb to super
206-
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
207-
.collect(LinkedHashMap::new,
208-
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
209-
LinkedHashMap::putAll);
210-
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
211-
.filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
212-
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
213-
.collect(LinkedHashMap::new,
214-
(map, column) -> map.put(column.getKey(), jdbcTypeNameFromRedshiftTypeName(column.getValue().type())),
215-
LinkedHashMap::putAll);
216-
217-
final boolean sameColumns = actualColumns.equals(intendedColumns)
218-
&& "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type())
219-
&& "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type())
220-
&& "super".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type());
221-
222-
return sameColumns;
223-
}
224-
225190
/**
226191
* Return ROW_NUMBER() OVER (PARTITION BY primaryKeys ORDER BY cursor DESC NULLS LAST,
227192
* _airbyte_extracted_at DESC)
@@ -265,8 +230,4 @@ public boolean shouldRetry(final Exception e) {
265230
return false;
266231
}
267232

268-
private static String jdbcTypeNameFromRedshiftTypeName(final String redshiftType) {
269-
return REDSHIFT_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType);
270-
}
271-
272233
}

airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java

-5
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,4 @@ protected int getMaxRecordValueLimit() {
241241
return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE;
242242
}
243243

244-
@Override
245-
protected int getGenerateBigStringAddExtraCharacters() {
246-
return 1;
247-
}
248-
249244
}

airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ protected JdbcCompatibleSourceOperations<?> getSourceOperations() {
3434
}
3535

3636
@Override
37-
protected SqlGenerator<?> getSqlGenerator() {
37+
protected SqlGenerator getSqlGenerator() {
3838
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {
3939

4040
// Override only for tests to print formatted SQL. The actual implementation should use unformatted

airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java

+9-27
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
import static io.airbyte.cdk.db.jdbc.DateTimeConverter.putJavaSQLTime;
88
import static io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations.escapeStringLiteral;
9-
import static org.junit.jupiter.api.Assertions.assertAll;
109
import static org.junit.jupiter.api.Assertions.assertEquals;
10+
import static org.junit.jupiter.api.Assertions.assertFalse;
1111
import static org.junit.jupiter.api.Assertions.assertTrue;
1212

1313
import com.fasterxml.jackson.databind.JsonNode;
@@ -17,11 +17,11 @@
1717
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1818
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
1919
import io.airbyte.cdk.db.jdbc.JdbcUtils;
20-
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
2120
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
2221
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
2322
import io.airbyte.commons.json.Jsons;
2423
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
24+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
2525
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
2626
import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination;
2727
import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer;
@@ -33,7 +33,7 @@
3333
import java.time.LocalDateTime;
3434
import java.time.OffsetTime;
3535
import java.time.ZoneOffset;
36-
import java.util.Optional;
36+
import java.util.List;
3737
import javax.sql.DataSource;
3838
import org.jooq.DSLContext;
3939
import org.jooq.DataType;
@@ -151,7 +151,7 @@ protected DSLContext getDslContext() {
151151
}
152152

153153
@Override
154-
protected DestinationHandler<TableDefinition> getDestinationHandler() {
154+
protected DestinationHandler getDestinationHandler() {
155155
return new RedshiftDestinationHandler(databaseName, database);
156156
}
157157

@@ -180,29 +180,11 @@ protected Field<?> toJsonValue(final String valueAsString) {
180180
public void testCreateTableIncremental() throws Exception {
181181
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
182182
destinationHandler.execute(sql);
183-
184-
final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id());
185-
186-
assertTrue(existingTable.isPresent());
187-
assertAll(
188-
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()),
189-
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()),
190-
() -> assertEquals("super", existingTable.get().columns().get("_airbyte_meta").type()),
191-
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()),
192-
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()),
193-
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()),
194-
() -> assertEquals("super", existingTable.get().columns().get("struct").type()),
195-
() -> assertEquals("super", existingTable.get().columns().get("array").type()),
196-
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()),
197-
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()),
198-
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()),
199-
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()),
200-
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()),
201-
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()),
202-
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()),
203-
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()),
204-
() -> assertEquals("date", existingTable.get().columns().get("date").type()),
205-
() -> assertEquals("super", existingTable.get().columns().get("unknown").type()));
183+
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
184+
assertEquals(1, initialStates.size());
185+
final DestinationInitialState initialState = initialStates.getFirst();
186+
assertTrue(initialState.isFinalTablePresent());
187+
assertFalse(initialState.isSchemaMismatch());
206188
// TODO assert on table clustering, etc.
207189
}
208190

0 commit comments

Comments
 (0)