Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d158042

Browse files
edgaogisripa
authored andcommittedMar 1, 2024·
add DestinationState stuff
1 parent 358a500 commit d158042

File tree

22 files changed

+904
-219
lines changed

22 files changed

+904
-219
lines changed
 

‎airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.cdk.integrations.destination.jdbc;
66

7+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
78
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
89
import static io.airbyte.cdk.integrations.util.ConfiguredCatalogUtilKt.addDefaultNamespaceToStreams;
910

@@ -17,7 +18,6 @@
1718
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
1819
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
1920
import io.airbyte.cdk.integrations.base.Destination;
20-
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
2121
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
2222
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
2323
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
@@ -37,6 +37,7 @@
3737
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
3838
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
3939
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
40+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
4041
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
4142
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
4243
import io.airbyte.protocol.models.v0.AirbyteMessage;
@@ -45,6 +46,7 @@
4546
import java.util.List;
4647
import java.util.Map;
4748
import java.util.Objects;
49+
import java.util.Optional;
4850
import java.util.UUID;
4951
import java.util.function.Consumer;
5052
import javax.sql.DataSource;
@@ -93,7 +95,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
9395
attemptTableOperations(outputSchema, database, namingResolver, sqlOperations, false);
9496
if (TypingAndDedupingFlag.isDestinationV2()) {
9597
final var v2RawSchema = namingResolver.getIdentifier(TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
96-
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
98+
.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
9799
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false);
98100
destinationSpecificTableOperations(database);
99101
}
@@ -252,7 +254,9 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri
252254

253255
protected abstract JdbcSqlGenerator getSqlGenerator();
254256

255-
protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database);
257+
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
258+
final JdbcDatabase database,
259+
final String rawTableSchema);
256260

257261
/**
258262
* "database" key at root of the config json, for any other variants in config, override this
@@ -309,21 +313,23 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
309313
*/
310314
private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) {
311315
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
312-
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
316+
Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE);
317+
final ParsedCatalog parsedCatalog = rawNamespaceOverride
313318
.map(override -> new CatalogParser(sqlGenerator, override))
314319
.orElse(new CatalogParser(sqlGenerator))
315320
.parseCatalog(catalog);
316321
final String databaseName = getDatabaseName(config);
317322
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
318323
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
319-
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
324+
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
325+
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
320326
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
321327
final TyperDeduper typerDeduper;
322328
if (disableTypeDedupe) {
323-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
329+
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
324330
} else {
325331
typerDeduper =
326-
new DefaultTyperDeduper(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
332+
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
327333
}
328334
return typerDeduper;
329335
}

‎airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java

+136-23
Large diffs are not rendered by default.

‎airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ protected JdbcSqlGenerator getSqlGenerator() {
142142
}
143143

144144
@Override
145-
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
145+
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
146146
return null;
147147
}
148148

‎airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
2323
import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest;
2424
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
25+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
2526
import java.sql.SQLException;
2627
import java.util.Arrays;
2728
import java.util.List;
@@ -36,7 +37,8 @@
3637
import org.jooq.impl.DSL;
3738
import org.jooq.impl.SQLDataType;
3839

39-
public abstract class JdbcSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest {
40+
public abstract class JdbcSqlGeneratorIntegrationTest<DestinationState extends MinimumDestinationState>
41+
extends BaseSqlGeneratorIntegrationTest<DestinationState> {
4042

4143
protected abstract JdbcDatabase getDatabase();
4244

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implem
2020
@Override
2121
public void migrateIfNecessary(
2222
final SqlGenerator sqlGenerator,
23-
final DestinationHandler destinationHandler,
23+
final DestinationHandler<?> destinationHandler,
2424
final StreamConfig streamConfig)
2525
throws Exception {
2626
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
@@ -60,7 +60,7 @@ protected boolean shouldMigrate(final StreamConfig streamConfig) throws Exceptio
6060
* @param streamConfig the stream to migrate the raw table of
6161
*/
6262
public void migrate(final SqlGenerator sqlGenerator,
63-
final DestinationHandler destinationHandler,
63+
final DestinationHandler<?> destinationHandler,
6464
final StreamConfig streamConfig)
6565
throws TableNotMigratedException {
6666
final var namespacedTableName = convertToV1RawName(streamConfig);

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java

+57-32
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66

77
import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
88
import static io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst;
9-
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.*;
9+
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads;
1010
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions;
1111
import static java.util.Collections.singleton;
12+
import static java.util.stream.Collectors.toMap;
1213

1314
import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
1415
import io.airbyte.commons.concurrency.CompletableFutures;
1516
import io.airbyte.commons.functional.Either;
17+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
18+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
1619
import io.airbyte.protocol.models.v0.DestinationSyncMode;
1720
import io.airbyte.protocol.models.v0.StreamDescriptor;
1821
import java.util.HashSet;
@@ -49,22 +52,23 @@
4952
* Note that #prepareTables() initializes some internal state. The other methods will throw an
5053
* exception if that method was not called.
5154
*/
52-
public class DefaultTyperDeduper implements TyperDeduper {
55+
public class DefaultTyperDeduper<DestinationState extends MinimumDestinationState> implements TyperDeduper {
5356

5457
private static final Logger LOGGER = LoggerFactory.getLogger(TyperDeduper.class);
5558

5659
private static final String NO_SUFFIX = "";
5760
private static final String TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp";
5861

5962
private final SqlGenerator sqlGenerator;
60-
private final DestinationHandler destinationHandler;
63+
private final DestinationHandler<DestinationState> destinationHandler;
6164

6265
private final DestinationV1V2Migrator v1V2Migrator;
6366
private final V2TableMigrator v2TableMigrator;
67+
private final List<Migration<DestinationState>> migrations;
6468
private final ParsedCatalog parsedCatalog;
6569
private Set<StreamId> overwriteStreamsWithTmpTable;
6670
private final Set<Pair<String, String>> streamsWithSuccessfulSetup;
67-
private final Map<StreamId, InitialRawTableState> initialRawTableStateByStream;
71+
private final Map<StreamId, InitialRawTableStatus> initialRawTableStateByStream;
6872
// We only want to run a single instance of T+D per stream at a time. These objects are used for
6973
// synchronization per stream.
7074
// Use a read-write lock because we need the same semantics:
@@ -77,17 +81,20 @@ public class DefaultTyperDeduper implements TyperDeduper {
7781
private final Map<StreamId, Lock> internalTdLocks;
7882

7983
private final ExecutorService executorService;
84+
private List<DestinationInitialStatus<DestinationState>> destinationInitialStatuses;
8085

8186
public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
82-
final DestinationHandler destinationHandler,
87+
final DestinationHandler<DestinationState> destinationHandler,
8388
final ParsedCatalog parsedCatalog,
8489
final DestinationV1V2Migrator v1V2Migrator,
85-
final V2TableMigrator v2TableMigrator) {
90+
final V2TableMigrator v2TableMigrator,
91+
final List<Migration<DestinationState>> migrations) {
8692
this.sqlGenerator = sqlGenerator;
8793
this.destinationHandler = destinationHandler;
8894
this.parsedCatalog = parsedCatalog;
8995
this.v1V2Migrator = v1V2Migrator;
9096
this.v2TableMigrator = v2TableMigrator;
97+
this.migrations = migrations;
9198
this.initialRawTableStateByStream = new ConcurrentHashMap<>();
9299
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
93100
this.tdLocks = new ConcurrentHashMap<>();
@@ -96,20 +103,45 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
96103
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
97104
}
98105

99-
public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
100-
final DestinationHandler destinationHandler,
106+
public DefaultTyperDeduper(
107+
final SqlGenerator sqlGenerator,
108+
final DestinationHandler<DestinationState> destinationHandler,
101109
final ParsedCatalog parsedCatalog,
102-
final DestinationV1V2Migrator v1V2Migrator) {
103-
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator());
110+
final DestinationV1V2Migrator v1V2Migrator,
111+
final List<Migration<DestinationState>> migrations) {
112+
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator(), migrations);
104113
}
105114

106115
@Override
107-
public void prepareSchemasAndRunMigrations() {
116+
public void prepareSchemasAndRunMigrations() throws Exception {
108117
// Technically kind of weird to call this here, but it's the best place we have.
109118
// Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas
110119
// until prepareFinalTables... but it doesn't really matter.
111120
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
112-
TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog);
121+
122+
TyperDeduperUtil.executeWeirdMigrations(
123+
executorService,
124+
sqlGenerator,
125+
destinationHandler,
126+
v1V2Migrator,
127+
v2TableMigrator,
128+
parsedCatalog);
129+
130+
destinationInitialStatuses = TyperDeduperUtil.executeRawTableMigrations(
131+
executorService,
132+
destinationHandler,
133+
migrations,
134+
destinationHandler.gatherInitialState(parsedCatalog.streams()));
135+
136+
// Commit our destination states immediately.
137+
// Technically, migrations aren't done until we execute the soft reset.
138+
// However, our state contains a needsSoftReset flag, so we can commit that we already executed the
139+
// migration
140+
// and even if we fail to run the soft reset in this sync, future syncs will see the soft reset flag
141+
// and finish it for us.
142+
destinationHandler.commitDestinationStates(destinationInitialStatuses.stream().collect(toMap(
143+
state -> state.streamConfig().id(),
144+
DestinationInitialStatus::destinationState)));
113145
}
114146

115147
@Override
@@ -120,25 +152,17 @@ public void prepareFinalTables() throws Exception {
120152
overwriteStreamsWithTmpTable = ConcurrentHashMap.newKeySet();
121153
LOGGER.info("Preparing tables");
122154

123-
final List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(parsedCatalog.streams());
124155
final List<Either<? extends Exception, Void>> prepareTablesFutureResult = CompletableFutures.allOf(
125-
initialStates.stream().map(this::prepareTablesFuture).toList()).toCompletableFuture().join();
156+
destinationInitialStatuses.stream().map(this::prepareTablesFuture).toList()).toCompletableFuture().join();
126157
getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to prepare tables:\n", prepareTablesFutureResult);
127-
}
128158

129-
private CompletionStage<Void> runMigrationsAsync(StreamConfig streamConfig) {
130-
return CompletableFuture.runAsync(() -> {
131-
try {
132-
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
133-
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, streamConfig);
134-
v2TableMigrator.migrateIfNecessary(streamConfig);
135-
} catch (Exception e) {
136-
throw new RuntimeException(e);
137-
}
138-
}, this.executorService);
159+
destinationHandler.commitDestinationStates(destinationInitialStatuses.stream().collect(toMap(
160+
state -> state.streamConfig().id(),
161+
// If we get here, then we've executed all soft resets. Force the soft reset flag to false.
162+
state -> state.destinationState().withSoftReset(false))));
139163
}
140164

141-
private CompletionStage<Void> prepareTablesFuture(final DestinationInitialState initialState) {
165+
private CompletionStage<Void> prepareTablesFuture(final DestinationInitialStatus<DestinationState> initialState) {
142166
// For each stream, make sure that its corresponding final table exists.
143167
// Also, for OVERWRITE streams, decide if we're writing directly to the final table, or into an
144168
// _airbyte_tmp table.
@@ -160,9 +184,10 @@ private CompletionStage<Void> prepareTablesFuture(final DestinationInitialState
160184
LOGGER.info("Final Table for stream {} is empty and matches the expected v2 format, writing to table directly",
161185
stream.id().finalName());
162186
}
163-
164-
} else if (initialState.isSchemaMismatch()) {
165-
// We're loading data directly into the existing table. Make sure it has the right schema.
187+
} else if (initialState.isSchemaMismatch() || initialState.destinationState().needsSoftReset()) {
188+
// We're loading data directly into the existing table.
189+
// Make sure it has the right schema.
190+
// Also, if a raw table migration wants us to do a soft reset, do that here.
166191
TypeAndDedupeTransaction.executeSoftReset(sqlGenerator, destinationHandler, stream);
167192
}
168193
} else {
@@ -171,7 +196,7 @@ private CompletionStage<Void> prepareTablesFuture(final DestinationInitialState
171196
destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false));
172197
}
173198

174-
initialRawTableStateByStream.put(stream.id(), initialState.initialRawTableState());
199+
initialRawTableStateByStream.put(stream.id(), initialState.initialRawTableStatus());
175200

176201
streamsWithSuccessfulSetup.add(Pair.of(stream.id().originalNamespace(), stream.id().originalName()));
177202

@@ -247,12 +272,12 @@ public CompletableFuture<Optional<Exception>> typeAndDedupeTask(final StreamConf
247272
final Lock externalLock = tdLocks.get(streamConfig.id()).writeLock();
248273
externalLock.lock();
249274
try {
250-
final InitialRawTableState initialRawTableState = initialRawTableStateByStream.get(streamConfig.id());
275+
final InitialRawTableStatus initialRawTableStatus = initialRawTableStateByStream.get(streamConfig.id());
251276
TypeAndDedupeTransaction.executeTypeAndDedupe(
252277
sqlGenerator,
253278
destinationHandler,
254279
streamConfig,
255-
initialRawTableState.maxProcessedTimestamp(),
280+
initialRawTableStatus.maxProcessedTimestamp(),
256281
getFinalTableSuffix(streamConfig.id()));
257282
} finally {
258283
LOGGER.info("Allowing other threads to proceed for {}.{}", originalNamespace, originalName);

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationHandler.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,20 @@
55
package io.airbyte.integrations.base.destination.typing_deduping;
66

77
import java.util.List;
8+
import java.util.Map;
89

9-
public interface DestinationHandler {
10+
public interface DestinationHandler<DestinationState> {
1011

1112
void execute(final Sql sql) throws Exception;
1213

13-
List<DestinationInitialState> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception;
14+
/**
15+
* Fetch the current state of the destination for the given streams. This method MUST create the
16+
* airbyte_internal.state table if it does not exist. This method MAY assume the airbyte_internal
17+
* schema already exists. (substitute the appropriate raw table schema if the user is overriding
18+
* it).
19+
*/
20+
List<DestinationInitialStatus<DestinationState>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception;
21+
22+
void commitDestinationStates(final Map<StreamId, DestinationState> destinationStates) throws Exception;
1423

1524
}

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialState.java

-23
This file was deleted.

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStateImpl.java

-14
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
package io.airbyte.integrations.base.destination.typing_deduping
5+
6+
@JvmRecord
7+
data class DestinationInitialStatus<DestinationState>(val streamConfig: StreamConfig,
8+
val isFinalTablePresent: Boolean,
9+
val initialRawTableStatus: InitialRawTableStatus,
10+
val isSchemaMismatch: Boolean,
11+
val isFinalTableEmpty: Boolean,
12+
val destinationState: DestinationState)

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public interface DestinationV1V2Migrator {
1818
*/
1919
void migrateIfNecessary(
2020
final SqlGenerator sqlGenerator,
21-
final DestinationHandler destinationHandler,
21+
final DestinationHandler<?> destinationHandler,
2222
final StreamConfig streamConfig)
2323
throws TableNotMigratedException, UnexpectedSchemaException, Exception;
2424

Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
import java.time.Instant;
88
import java.util.Optional;
99

10-
public record InitialRawTableState(boolean hasUnprocessedRecords, Optional<Instant> maxProcessedTimestamp) {
10+
public record InitialRawTableStatus(boolean rawTableExists, boolean hasUnprocessedRecords, Optional<Instant> maxProcessedTimestamp) {
1111

1212
}

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public class NoOpDestinationV1V2Migrator implements DestinationV1V2Migrator {
88

99
@Override
1010
public void migrateIfNecessary(final SqlGenerator sqlGenerator,
11-
final DestinationHandler destinationHandler,
11+
final DestinationHandler<?> destinationHandler,
1212
final StreamConfig streamConfig)
1313
throws TableNotMigratedException, UnexpectedSchemaException {
1414
// Do nothing

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java

+32-6
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@
66

77
import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME;
88
import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads;
9+
import static java.util.stream.Collectors.toMap;
910

1011
import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
12+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
13+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
1114
import io.airbyte.protocol.models.v0.StreamDescriptor;
15+
import java.util.List;
1216
import java.util.Map;
1317
import java.util.concurrent.ExecutorService;
1418
import java.util.concurrent.Executors;
@@ -22,33 +26,55 @@
2226
* json->string migrations in the raw tables.
2327
*/
2428
@Slf4j
25-
public class NoOpTyperDeduperWithV1V2Migrations implements TyperDeduper {
29+
public class NoOpTyperDeduperWithV1V2Migrations<DestinationState extends MinimumDestinationState> implements TyperDeduper {
2630

2731
private final DestinationV1V2Migrator v1V2Migrator;
2832
private final V2TableMigrator v2TableMigrator;
33+
private final List<Migration<DestinationState>> migrations;
2934
private final ExecutorService executorService;
3035
private final ParsedCatalog parsedCatalog;
3136
private final SqlGenerator sqlGenerator;
32-
private final DestinationHandler destinationHandler;
37+
private final DestinationHandler<DestinationState> destinationHandler;
3338

3439
public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
35-
final DestinationHandler destinationHandler,
40+
final DestinationHandler<DestinationState> destinationHandler,
3641
final ParsedCatalog parsedCatalog,
3742
final DestinationV1V2Migrator v1V2Migrator,
38-
final V2TableMigrator v2TableMigrator) {
43+
final V2TableMigrator v2TableMigrator,
44+
final List<Migration<DestinationState>> migrations) {
3945
this.sqlGenerator = sqlGenerator;
4046
this.destinationHandler = destinationHandler;
4147
this.parsedCatalog = parsedCatalog;
4248
this.v1V2Migrator = v1V2Migrator;
4349
this.v2TableMigrator = v2TableMigrator;
50+
this.migrations = migrations;
4451
this.executorService = Executors.newFixedThreadPool(getCountOfTypeAndDedupeThreads(),
4552
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
4653
}
4754

4855
@Override
49-
public void prepareSchemasAndRunMigrations() {
56+
public void prepareSchemasAndRunMigrations() throws Exception {
5057
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
51-
TyperDeduperUtil.executeRawTableMigrations(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, parsedCatalog);
58+
59+
TyperDeduperUtil.executeWeirdMigrations(
60+
executorService,
61+
sqlGenerator,
62+
destinationHandler,
63+
v1V2Migrator,
64+
v2TableMigrator,
65+
parsedCatalog);
66+
67+
List<DestinationInitialStatus<DestinationState>> destinationInitialStatuses = TyperDeduperUtil.executeRawTableMigrations(
68+
executorService,
69+
destinationHandler,
70+
migrations,
71+
destinationHandler.gatherInitialState(parsedCatalog.streams()));
72+
73+
// Commit the updated destination states.
74+
// We don't need to trigger any soft resets, because we don't have any final tables.
75+
destinationHandler.commitDestinationStates(destinationInitialStatuses.stream().collect(toMap(
76+
state -> state.streamConfig().id(),
77+
DestinationInitialStatus::destinationState)));
5278
}
5379

5480
@Override

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class TypeAndDedupeTransaction {
2727
* @throws Exception if the safe query fails
2828
*/
2929
public static void executeTypeAndDedupe(final SqlGenerator sqlGenerator,
30-
final DestinationHandler destinationHandler,
30+
final DestinationHandler<?> destinationHandler,
3131
final StreamConfig streamConfig,
3232
final Optional<Instant> minExtractedAt,
3333
final String suffix)
@@ -63,7 +63,7 @@ public static void executeTypeAndDedupe(final SqlGenerator sqlGenerator,
6363
* @throws Exception if the safe query fails
6464
*/
6565
public static void executeSoftReset(final SqlGenerator sqlGenerator,
66-
final DestinationHandler destinationHandler,
66+
final DestinationHandler<?> destinationHandler,
6767
final StreamConfig streamConfig)
6868
throws Exception {
6969
LOGGER.info("Attempting soft reset for stream {} {}", streamConfig.id().originalNamespace(), streamConfig.id().originalName());

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt

+111-28
Original file line numberDiff line numberDiff line change
@@ -3,44 +3,122 @@ package io.airbyte.integrations.base.destination.typing_deduping
33
import com.google.common.collect.Streams
44
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
55
import io.airbyte.commons.concurrency.CompletableFutures
6+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
7+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
8+
import org.slf4j.Logger
9+
import org.slf4j.LoggerFactory
610
import java.util.*
711
import java.util.concurrent.CompletableFuture
812
import java.util.concurrent.CompletionStage
913
import java.util.concurrent.ExecutorService
10-
14+
import java.util.stream.Collectors.toMap
1115

1216
class TyperDeduperUtil {
1317
companion object {
18+
private val LOGGER: Logger = LoggerFactory.getLogger(TyperDeduperUtil::class.java)
1419

1520
@JvmStatic
16-
fun executeRawTableMigrations(
21+
fun <DestinationState: MinimumDestinationState> executeRawTableMigrations(
1722
executorService: ExecutorService,
18-
sqlGenerator: SqlGenerator,
19-
destinationHandler: DestinationHandler,
20-
v1V2Migrator: DestinationV1V2Migrator,
21-
v2TableMigrator: V2TableMigrator,
22-
parsedCatalog: ParsedCatalog
23-
) {
23+
destinationHandler: DestinationHandler<DestinationState>,
24+
migrations: List<Migration<DestinationState>>,
25+
initialStates: List<DestinationInitialStatus<DestinationState>>
26+
): List<DestinationInitialStatus<DestinationState>> {
2427
// TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables.
2528
// unify the logic
2629
// with current state of raw tables & final tables. This is done first before gather initial state
2730
// to avoid recreating
2831
// final tables later again.
29-
val runMigrationsResult =
30-
CompletableFutures.allOf(parsedCatalog.streams().stream()
31-
.map { streamConfig -> runMigrationsAsync(executorService, sqlGenerator, destinationHandler, v1V2Migrator, v2TableMigrator, streamConfig) }
32-
.toList()).toCompletableFuture().join()
33-
getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", runMigrationsResult)
32+
33+
// Run migrations in lockstep. Some migrations may require us to refetch the initial state.
34+
// We want to be able to batch those calls together across streams.
35+
// If a migration runs on one stream, it's likely to also run on other streams.
36+
// So we can bundle the gatherInitialState calls together.
37+
var currentStates = initialStates
38+
for (migration in migrations) {
39+
// Execute the migration on all streams in parallel
40+
val futures: Map<StreamId, CompletionStage<Migration.MigrationResult<DestinationState>>> = currentStates.stream()
41+
.collect(toMap(
42+
{ it.streamConfig.id },
43+
{ initialState -> runMigrationsAsync(executorService, destinationHandler, migration, initialState) }
44+
))
45+
val migrationResultFutures = CompletableFutures.allOf(futures.values.toList()).toCompletableFuture().join()
46+
getResultsOrLogAndThrowFirst("The following exceptions were thrown attempting to run migrations:\n", migrationResultFutures)
47+
val migrationResults: Map<StreamId, Migration.MigrationResult<DestinationState>> = futures.mapValues { it.value.toCompletableFuture().join() }
48+
49+
// Check if we need to refetch DestinationInitialState
50+
val invalidatedStreams: Set<StreamId> = migrationResults.filter { it.value.invalidateInitialState }.keys
51+
val updatedStates: List<DestinationInitialStatus<DestinationState>>
52+
if (invalidatedStreams.isNotEmpty()) {
53+
LOGGER.info("Refetching initial state for streams: $invalidatedStreams")
54+
updatedStates = destinationHandler.gatherInitialState(currentStates.filter{invalidatedStreams.contains(it.streamConfig.id)}.map {it.streamConfig})
55+
LOGGER.info("Updated states: $updatedStates")
56+
} else {
57+
updatedStates = emptyList()
58+
}
59+
60+
// Update the DestinationInitialStates with the new DestinationStates,
61+
// and also update initialStates with the refetched states.
62+
currentStates = currentStates.map { initialState ->
63+
// migrationResults will always contain an entry for each stream, so we can safely use !!
64+
val updatedDestinationState = migrationResults[initialState.streamConfig.id]!!.updatedDestinationState
65+
if (invalidatedStreams.contains(initialState.streamConfig.id)) {
66+
// We invalidated this stream's DestinationInitialState.
67+
// Find the updated DestinationInitialState, and update it with our new DestinationState
68+
return@map updatedStates.filter{updatedState -> updatedState.streamConfig.id.equals(initialState.streamConfig.id)}
69+
.first()
70+
.copy(destinationState = updatedDestinationState)
71+
} else {
72+
// Just update the original DestinationInitialState with the new DestinationState.
73+
return@map initialState.copy(destinationState = updatedDestinationState)
74+
}
75+
}
76+
77+
}
78+
return currentStates
79+
}
80+
81+
/**
82+
* The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather
83+
* initial state, because they're dumb and weird.
84+
* (specifically: SnowflakeV2TableMigrator inspects the final tables and triggers a soft reset
85+
* directly within the migration).
86+
* TODO: Migrate these migrations to the new migration system.
87+
* This will also reduce the number of times we need to query DB metadata, since (a) we can rely
88+
* on the gatherInitialState values, and (b) we can add a DestinationState field for these migrations.
89+
* It also enables us to not trigger multiple soft resets in a single sync.
90+
*/
91+
@JvmStatic
92+
fun <DestinationState> executeWeirdMigrations(
93+
executorService: ExecutorService,
94+
sqlGenerator: SqlGenerator,
95+
destinationHandler: DestinationHandler<DestinationState>,
96+
v1V2Migrator: DestinationV1V2Migrator,
97+
v2TableMigrator: V2TableMigrator,
98+
parsedCatalog: ParsedCatalog
99+
) {
100+
val futures = parsedCatalog.streams.map {
101+
CompletableFuture.supplyAsync(
102+
{
103+
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it)
104+
v2TableMigrator.migrateIfNecessary(it)
105+
},
106+
executorService
107+
)
108+
}
109+
getResultsOrLogAndThrowFirst(
110+
"The following exceptions were thrown attempting to run migrations:\n",
111+
CompletableFutures.allOf(futures.toList()).toCompletableFuture().join())
34112
}
35113

36114
/**
37115
* Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they
38116
* exist in the Destination Database.
39117
*/
40118
@JvmStatic
41-
fun prepareSchemas(
119+
fun <DestinationState> prepareSchemas(
42120
sqlGenerator: SqlGenerator,
43-
destinationHandler: DestinationHandler,
121+
destinationHandler: DestinationHandler<DestinationState>,
44122
parsedCatalog: ParsedCatalog) {
45123
val rawSchema = parsedCatalog.streams.stream().map { it.id.rawNamespace }
46124
val finalSchema = parsedCatalog.streams.stream().map { it.id.finalNamespace }
@@ -52,20 +130,25 @@ class TyperDeduperUtil {
52130
destinationHandler.execute(Sql.concat(createAllSchemasSql))
53131
}
54132

55-
private fun runMigrationsAsync(
133+
private fun <DestinationState: MinimumDestinationState> runMigrationsAsync(
56134
executorService: ExecutorService,
57-
sqlGenerator: SqlGenerator,
58-
destinationHandler: DestinationHandler,
59-
v1V2Migrator: DestinationV1V2Migrator,
60-
v2TableMigrator: V2TableMigrator,
61-
streamConfig: StreamConfig): CompletionStage<Void> {
62-
return CompletableFuture.runAsync({
63-
try {
64-
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, streamConfig)
65-
v2TableMigrator.migrateIfNecessary(streamConfig)
66-
} catch (e: java.lang.Exception) {
67-
throw RuntimeException(e)
68-
}
135+
destinationHandler: DestinationHandler<DestinationState>,
136+
migration: Migration<DestinationState>,
137+
initialStatus: DestinationInitialStatus<DestinationState>
138+
): CompletionStage<Migration.MigrationResult<DestinationState>> {
139+
return CompletableFuture.supplyAsync({
140+
LOGGER.info("Maybe executing ${migration.javaClass.simpleName} migration for stream ${initialStatus.streamConfig.id.originalNamespace}.${initialStatus.streamConfig.id.originalName}.")
141+
142+
// We technically don't need to track this, but might as well hedge against migrations
143+
// accidentally setting softReset=false
144+
val softReset = initialStatus.destinationState.needsSoftReset()
145+
val migrationResult = migration.migrateIfNecessary(
146+
destinationHandler,
147+
initialStatus.streamConfig,
148+
initialStatus)
149+
val updatedNeedsSoftReset = softReset || migrationResult.updatedDestinationState.needsSoftReset()
150+
return@supplyAsync migrationResult.copy(
151+
updatedDestinationState = migrationResult.updatedDestinationState.withSoftReset(updatedNeedsSoftReset))
69152
}, executorService)
70153
}
71154
}

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
package io.airbyte.integrations.base.destination.typing_deduping;
66

7+
/**
8+
* Prefer {@link io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration}
9+
* instead.
10+
*/
711
public interface V2TableMigrator {
812

913
void migrateIfNecessary(final StreamConfig streamConfig) throws Exception;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.airbyte.integrations.base.destination.typing_deduping.migrators
2+
3+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
4+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
5+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
6+
7+
/**
8+
* Migrations may do two things:
9+
* 1. Modify the raw table
10+
* 2. Trigger a soft reset
11+
*
12+
* The raw table modification should happen in {@link #migrateIfNecessary(Object, StreamConfig)}. However,
13+
* if multiple migrations want to trigger a soft reset, we should only trigger a single soft reset,
14+
* because soft resets are idempotent. There's no reason to trigger multiple soft resets in sequence,
15+
* and it would be a waste of warehouse compute to do so. Migrations MUST NOT directly run a soft reset
16+
* within {@link #migrateIfNecessary(Object, StreamConfig)}.
17+
* <p>
18+
* Migrations are encouraged to store something into the destination State blob. This allows us to make
19+
* fewer queries into customer data. However, migrations MUST NOT rely solely on the state blob to trigger
20+
* migrations. It's possible for a state to not be committed after a migration runs (e.g. a well-timed
21+
* OOMKill). Therefore, if the state blob indicates that a migration is necessary, migrations must still
22+
* confirm against the database that the migration is necessary.
23+
*/
24+
interface Migration<DestinationState: MinimumDestinationState> {
25+
26+
/**
27+
* Perform the migration if it's necessary. Implementations of this method MUST check against the database
28+
* to confirm the the migration is still necessary, in case a previous migration ran, but failed
29+
* to update the state.
30+
*
31+
* Migrations MUST NOT set the `needsSoftReset` flag to false, but they MAY set it to true.
32+
*/
33+
fun migrateIfNecessary(
34+
destinationHandler: DestinationHandler<DestinationState>,
35+
stream: StreamConfig,
36+
state: DestinationInitialStatus<DestinationState>
37+
): MigrationResult<DestinationState>
38+
39+
/**
40+
* @param invalidateInitialState If true, the migration modified the raw tables in a way that requires us to re-gather initial state.
41+
*/
42+
data class MigrationResult<DestinationState>(
43+
val updatedDestinationState: DestinationState,
44+
val invalidateInitialState: Boolean
45+
)
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.airbyte.integrations.base.destination.typing_deduping.migrators
2+
3+
/**
4+
* All destination states MUST contain a parameter `needsSoftReset`. This allows migrators to track
5+
* whether a soft reset is necessary, and persist that value across syncs in case of sync failure.
6+
*/
7+
interface MinimumDestinationState {
8+
fun needsSoftReset(): Boolean
9+
10+
/**
11+
* The type parameter should be the subclass itself. We need this so that [withSoftReset] can
12+
* return the correct type. Callers are responsible for passing the correct type parameter
13+
* into this function (e.g. `currentState.withSoftReset<DestinationState>(softReset)`).
14+
*
15+
* Implementations generally look like this: (note the unchecked `as T` cast)
16+
* ```kotlin
17+
* data class ExampleState(val needsSoftReset: Boolean, <other fields...>): MinimumDestinationState {
18+
* override fun needsSoftReset(): Boolean {
19+
* return needsSoftReset
20+
* }
21+
*
22+
* override fun <T: MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
23+
* return copy(needsSoftReset = true) as T
24+
* }
25+
* }
26+
* ```
27+
*/
28+
fun <T: MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T
29+
30+
/**
31+
* A minimal implementation of [MinimumDestinationState]. This is useful for destinations that don't
32+
* want to bother implementing a full state object.
33+
*/
34+
data class Impl(val needsSoftReset: Boolean): MinimumDestinationState {
35+
override fun needsSoftReset(): Boolean {
36+
return needsSoftReset
37+
}
38+
39+
override fun <T: MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
40+
return copy(needsSoftReset = true) as T
41+
}
42+
}
43+
}

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java

+368-46
Large diffs are not rendered by default.

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public void testMismatchedSchemaThrowsException() throws Exception {
7474
public void testMigrate() throws Exception {
7575
final var sqlGenerator = new MockSqlGenerator();
7676
final StreamConfig stream = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
77-
final DestinationHandler handler = Mockito.mock(DestinationHandler.class);
77+
final DestinationHandler<?> handler = Mockito.mock(DestinationHandler.class);
7878
final var sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table");
7979
// All is well
8080
final var migrator = noIssuesMigrator();

‎airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java

+59-28
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.collect.Streams;
1818
import io.airbyte.commons.json.Jsons;
1919
import io.airbyte.commons.string.Strings;
20+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
2021
import io.airbyte.protocol.models.v0.DestinationSyncMode;
2122
import io.airbyte.protocol.models.v0.SyncMode;
2223
import java.time.Instant;
@@ -50,7 +51,7 @@
5051
* {@link #getDestinationHandler()} in a {@link org.junit.jupiter.api.BeforeEach} method.
5152
*/
5253
@Execution(ExecutionMode.CONCURRENT)
53-
public abstract class BaseSqlGeneratorIntegrationTest {
54+
public abstract class BaseSqlGeneratorIntegrationTest<DestinationState extends MinimumDestinationState> {
5455

5556
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSqlGeneratorIntegrationTest.class);
5657
/**
@@ -104,7 +105,7 @@ public abstract class BaseSqlGeneratorIntegrationTest {
104105
protected StreamConfig cdcIncrementalAppendStream;
105106

106107
protected SqlGenerator generator;
107-
protected DestinationHandler destinationHandler;
108+
protected DestinationHandler<DestinationState> destinationHandler;
108109
protected String namespace;
109110

110111
protected StreamId streamId;
@@ -114,7 +115,7 @@ public abstract class BaseSqlGeneratorIntegrationTest {
114115

115116
protected abstract SqlGenerator getSqlGenerator();
116117

117-
protected abstract DestinationHandler getDestinationHandler();
118+
protected abstract DestinationHandler<DestinationState> getDestinationHandler();
118119

119120
/**
120121
* Subclasses should override this method if they need to make changes to the stream ID. For
@@ -192,7 +193,6 @@ protected Map<String, String> getFinalMetadataColumnNames() {
192193
@BeforeEach
193194
public void setup() throws Exception {
194195
generator = getSqlGenerator();
195-
destinationHandler = getDestinationHandler();
196196

197197
final ColumnId id1 = generator.buildColumnId("id1");
198198
final ColumnId id2 = generator.buildColumnId("id2");
@@ -263,6 +263,8 @@ public void setup() throws Exception {
263263
Optional.of(cursor),
264264
cdcColumns);
265265

266+
destinationHandler = getDestinationHandler();
267+
266268
LOGGER.info("Running with namespace {}", namespace);
267269
createNamespace(namespace);
268270
}
@@ -272,8 +274,8 @@ public void teardown() throws Exception {
272274
teardownNamespace(namespace);
273275
}
274276

275-
private DestinationInitialState getDestinationInitialState(StreamConfig streamConfig) throws Exception {
276-
final List<DestinationInitialState> initialState =
277+
private DestinationInitialStatus<DestinationState> getDestinationInitialState(StreamConfig streamConfig) throws Exception {
278+
final List<DestinationInitialStatus<DestinationState>> initialState =
277279
destinationHandler.gatherInitialState(List.of(streamConfig));
278280
assertEquals(1, initialState.size(), "gatherInitialState returned the wrong number of futures");
279281
assertTrue(initialState.getFirst().isFinalTablePresent(), "Destination handler could not find existing table");
@@ -287,9 +289,9 @@ private DestinationInitialState getDestinationInitialState(StreamConfig streamCo
287289
public void detectNoSchemaChange() throws Exception {
288290
final Sql createTable = generator.createTable(incrementalDedupStream, "", false);
289291
destinationHandler.execute(createTable);
290-
final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream);
292+
final DestinationInitialStatus<DestinationState> destinationInitialStatus = getDestinationInitialState(incrementalDedupStream);
291293
assertFalse(
292-
destinationInitialState.isSchemaMismatch(),
294+
destinationInitialStatus.isSchemaMismatch(),
293295
"Unchanged schema was incorrectly detected as a schema change.");
294296
}
295297

@@ -303,9 +305,9 @@ public void detectColumnAdded() throws Exception {
303305
incrementalDedupStream.columns().put(
304306
generator.buildColumnId("new_column"),
305307
AirbyteProtocolType.STRING);
306-
final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream);
308+
final DestinationInitialStatus<DestinationState> destinationInitialStatus = getDestinationInitialState(incrementalDedupStream);
307309
assertTrue(
308-
destinationInitialState.isSchemaMismatch(),
310+
destinationInitialStatus.isSchemaMismatch(),
309311
"Adding a new column was not detected as a schema change.");
310312
}
311313

@@ -317,9 +319,9 @@ public void detectColumnRemoved() throws Exception {
317319
final Sql createTable = generator.createTable(incrementalDedupStream, "", false);
318320
destinationHandler.execute(createTable);
319321
incrementalDedupStream.columns().remove(generator.buildColumnId("string"));
320-
final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream);
322+
final DestinationInitialStatus<DestinationState> destinationInitialStatus = getDestinationInitialState(incrementalDedupStream);
321323
assertTrue(
322-
destinationInitialState.isSchemaMismatch(),
324+
destinationInitialStatus.isSchemaMismatch(),
323325
"Removing a column was not detected as a schema change.");
324326
}
325327

@@ -333,9 +335,9 @@ public void detectColumnChanged() throws Exception {
333335
incrementalDedupStream.columns().put(
334336
generator.buildColumnId("string"),
335337
AirbyteProtocolType.INTEGER);
336-
final DestinationInitialState destinationInitialState = getDestinationInitialState(incrementalDedupStream);
338+
final DestinationInitialStatus<DestinationState> destinationInitialStatus = getDestinationInitialState(incrementalDedupStream);
337339
assertTrue(
338-
destinationInitialState.isSchemaMismatch(),
340+
destinationInitialStatus.isSchemaMismatch(),
339341
"Altering a column was not detected as a schema change.");
340342
}
341343

@@ -373,7 +375,7 @@ public void incrementalDedupSameNameNamespace() throws Exception {
373375
verifyRecordCounts(1, rawRecords, 1, finalRecords);
374376
}
375377

376-
private DestinationInitialState getOnly(final List<DestinationInitialState> initialStates) {
378+
private DestinationInitialStatus<DestinationState> getOnly(final List<DestinationInitialStatus<DestinationState>> initialStates) {
377379
assertEquals(1, initialStates.size());
378380
return initialStates.getFirst();
379381
}
@@ -403,7 +405,7 @@ public void allTypes() throws Exception {
403405
streamId,
404406
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));
405407

406-
DestinationInitialState initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
408+
DestinationInitialStatus<DestinationState> initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
407409
assertTrue(initialState.isFinalTableEmpty(), "Final table should be empty before T+D");
408410

409411
TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, incrementalDedupStream, Optional.empty(), "");
@@ -428,7 +430,7 @@ public void allTypesUnsafe() throws Exception {
428430
streamId,
429431
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_unsafe_inputrecords.jsonl"));
430432

431-
DestinationInitialState initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
433+
DestinationInitialStatus<DestinationState> initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
432434
assertTrue(initialState.isFinalTableEmpty(), "Final table should be empty before T+D");
433435

434436
// Instead of using the full T+D transaction, explicitly run with useSafeCasting=false.
@@ -439,11 +441,11 @@ public void allTypesUnsafe() throws Exception {
439441
assertFalse(initialState.isFinalTableEmpty(), "Final table should not be empty after T+D");
440442
}
441443

442-
private InitialRawTableState getInitialRawTableState(StreamConfig streamConfig) throws Exception {
443-
List<DestinationInitialState> initialStates =
444+
private InitialRawTableStatus getInitialRawTableState(StreamConfig streamConfig) throws Exception {
445+
List<DestinationInitialStatus<DestinationState>> initialStates =
444446
destinationHandler.gatherInitialState(List.of(streamConfig));
445447
assertEquals(1, initialStates.size());
446-
return initialStates.getFirst().initialRawTableState();
448+
return initialStates.getFirst().initialRawTableStatus();
447449
}
448450

449451
/**
@@ -453,11 +455,11 @@ private InitialRawTableState getInitialRawTableState(StreamConfig streamConfig)
453455
@Test
454456
public void minTimestampBehavesCorrectly() throws Exception {
455457
// When the raw table doesn't exist, there are no unprocessed records and no timestamp
456-
assertEquals(new InitialRawTableState(false, Optional.empty()), getInitialRawTableState(incrementalAppendStream));
458+
assertEquals(new InitialRawTableStatus(false, false, Optional.empty()), getInitialRawTableState(incrementalAppendStream));
457459

458460
// When the raw table is empty, there are still no unprocessed records and no timestamp
459461
createRawTable(streamId);
460-
assertEquals(new InitialRawTableState(false, Optional.empty()), getInitialRawTableState(incrementalAppendStream));
462+
assertEquals(new InitialRawTableStatus(true, false, Optional.empty()), getInitialRawTableState(incrementalAppendStream));
461463

462464
// If we insert some raw records with null loaded_at, we should get the min extracted_at
463465
insertRawTableRecords(
@@ -479,7 +481,7 @@ public void minTimestampBehavesCorrectly() throws Exception {
479481
"_airbyte_data": {}
480482
}
481483
""")));
482-
InitialRawTableState tableState = getInitialRawTableState(incrementalAppendStream);
484+
InitialRawTableStatus tableState = getInitialRawTableState(incrementalAppendStream);
483485
assertTrue(tableState.hasUnprocessedRecords(),
484486
"When all raw records have null loaded_at, we should recognize that there are unprocessed records");
485487
assertTrue(
@@ -493,7 +495,7 @@ public void minTimestampBehavesCorrectly() throws Exception {
493495

494496
assertEquals(
495497
getInitialRawTableState(incrementalAppendStream),
496-
new InitialRawTableState(false, Optional.of(Instant.parse("2023-01-02T00:00:00Z"))),
498+
new InitialRawTableStatus(true, false, Optional.of(Instant.parse("2023-01-02T00:00:00Z"))),
497499
"When all raw records have non-null loaded_at, we should recognize that there are no unprocessed records, and the min timestamp should be equal to the latest extracted_at");
498500

499501
// If we insert another raw record with older extracted_at than the typed records, we should fetch a
@@ -549,7 +551,7 @@ public void handlePreexistingRecords() throws Exception {
549551
streamId,
550552
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));
551553

552-
final InitialRawTableState tableState = getInitialRawTableState(incrementalDedupStream);
554+
final InitialRawTableStatus tableState = getInitialRawTableState(incrementalDedupStream);
553555
assertAll(
554556
() -> assertTrue(tableState.hasUnprocessedRecords(),
555557
"After writing some raw records, we should recognize that there are unprocessed records"),
@@ -575,7 +577,7 @@ public void handleNoPreexistingRecords() throws Exception {
575577
generator.buildColumnId("IamACaseSensitiveColumnName"),
576578
AirbyteProtocolType.STRING);
577579
createRawTable(streamId);
578-
final InitialRawTableState tableState = getInitialRawTableState(incrementalDedupStream);
580+
final InitialRawTableStatus tableState = getInitialRawTableState(incrementalDedupStream);
579581
assertAll(
580582
() -> assertFalse(tableState.hasUnprocessedRecords(), "With an empty raw table, we should recognize that there are no unprocessed records"),
581583
() -> assertEquals(Optional.empty(), tableState.maxProcessedTimestamp(), "With an empty raw table, the min timestamp should be empty"));
@@ -900,7 +902,7 @@ public void testCdcOrdering_updateAfterDelete() throws Exception {
900902
streamId,
901903
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl"));
902904

903-
final InitialRawTableState tableState = getInitialRawTableState(cdcIncrementalDedupStream);
905+
final InitialRawTableStatus tableState = getInitialRawTableState(cdcIncrementalDedupStream);
904906
TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, cdcIncrementalDedupStream, tableState.maxProcessedTimestamp(), "");
905907

906908
verifyRecordCounts(
@@ -937,7 +939,7 @@ public void testCdcOrdering_insertAfterDelete() throws Exception {
937939
"",
938940
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl"));
939941

940-
final InitialRawTableState tableState = getInitialRawTableState(cdcIncrementalAppendStream);
942+
final InitialRawTableStatus tableState = getInitialRawTableState(cdcIncrementalAppendStream);
941943
TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, cdcIncrementalDedupStream, tableState.maxProcessedTimestamp(), "");
942944
verifyRecordCounts(
943945
2,
@@ -1245,6 +1247,35 @@ public void testCreateTableForce() throws Exception {
12451247
getDestinationInitialState(incrementalDedupStream);
12461248
}
12471249

1250+
@Test
1251+
public void testStateHandling() throws Exception {
1252+
// Fetch state from an empty destination. This should not throw an error.
1253+
final DestinationInitialStatus<DestinationState> initialState =
1254+
destinationHandler.gatherInitialState(List.of((incrementalDedupStream))).getFirst();
1255+
// The initial state should not need a soft reset.
1256+
assertFalse(initialState.destinationState().needsSoftReset(), "Empty state table should have needsSoftReset = false");
1257+
1258+
// Commit a state that now requires a soft reset.
1259+
destinationHandler.commitDestinationStates(Map.of(
1260+
incrementalDedupStream.id(),
1261+
initialState.destinationState().withSoftReset(true)));
1262+
final DestinationInitialStatus<DestinationState> updatedState =
1263+
destinationHandler.gatherInitialState(List.of((incrementalDedupStream))).getFirst();
1264+
// When we re-fetch the state, it should now need a soft reset.
1265+
assertTrue(updatedState.destinationState().needsSoftReset(), "After committing an explicit state, expected needsSoftReset = true");
1266+
1267+
// Commit a state belonging to a different stream
1268+
destinationHandler.commitDestinationStates(Map.of(
1269+
new StreamId(null, null, null, null, null, "some_other_stream"),
1270+
initialState.destinationState().withSoftReset(true)));
1271+
1272+
// Verify that we can still retrieve the state for the original stream
1273+
final DestinationInitialStatus<DestinationState> refetchedState =
1274+
destinationHandler.gatherInitialState(List.of((incrementalDedupStream))).getFirst();
1275+
// When we re-fetch the state, it should now need a soft reset.
1276+
assertTrue(refetchedState.destinationState().needsSoftReset(), "After committing an unrelated state, expected needsSoftReset = true");
1277+
}
1278+
12481279
protected void createFinalTable(final StreamConfig stream, final String suffix) throws Exception {
12491280
final Sql createTable = generator.createTable(stream, suffix, false);
12501281
destinationHandler.execute(createTable);

0 commit comments

Comments
 (0)
Please sign in to comment.