Skip to content

Commit 6b2bcad

Browse files
committed
nit fixes
1 parent ac8a70d commit 6b2bcad

File tree

3 files changed

+16
-23
lines changed

3 files changed

+16
-23
lines changed

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,16 @@ private CompletionStage<DestinationInitialState> retrieveState(final StreamConfi
161161
return CompletableFuture.supplyAsync(() -> {
162162
try {
163163
final Optional<TableDefinition> finalTableDefinition = findExistingTable(streamConfig.id());
164-
// Only evaluate schema mismatch & final table emptiness if the final table exists.
165-
boolean isSchemaMismatch = false;
166-
boolean isFinalTableEmpty = true;
164+
final boolean isSchemaMismatch;
165+
final boolean isFinalTableEmpty;
167166
if (finalTableDefinition.isPresent()) {
168167
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, finalTableDefinition.get());
169168
isFinalTableEmpty = isFinalTableEmpty(streamConfig.id());
169+
} else {
170+
// If the final table doesn't exist, then by definition it doesn't have a schema mismatch and has no
171+
// records.
172+
isSchemaMismatch = false;
173+
isFinalTableEmpty = true;
170174
}
171175
final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id());
172176
return new DestinationInitialStateImpl(streamConfig, finalTableDefinition.isPresent(), initialRawTableState,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java

-6
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,6 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo
3232
return Sql.of("CREATE TABLE " + stream.id().finalTableId("", suffix));
3333
}
3434

35-
// @Override
36-
// public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final String
37-
// existingTable) throws TableNotMigratedException {
38-
// return false;
39-
// }
40-
4135
@Override
4236
public Sql updateTable(final StreamConfig stream,
4337
final String finalSuffix,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java

+9-14
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,11 @@ public void incrementalDedupSameNameNamespace() throws Exception {
373373
verifyRecordCounts(1, rawRecords, 1, finalRecords);
374374
}
375375

376+
private DestinationInitialState getOnly(final List<DestinationInitialState> initialStates) {
377+
assertEquals(1, initialStates.size());
378+
return initialStates.getFirst();
379+
}
380+
376381
/**
377382
* Run a full T+D update for an incremental-dedup stream, writing to a final table with "_foo"
378383
* suffix, with values for all data types. Verifies all behaviors for all types:
@@ -398,10 +403,7 @@ public void allTypes() throws Exception {
398403
streamId,
399404
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));
400405

401-
List<DestinationInitialState> initialStates =
402-
destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
403-
assertEquals(1, initialStates.size());
404-
DestinationInitialState initialState = initialStates.getFirst();
406+
DestinationInitialState initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
405407
assertTrue(initialState.isFinalTableEmpty(), "Final table should be empty before T+D");
406408

407409
TypeAndDedupeTransaction.executeTypeAndDedupe(generator, destinationHandler, incrementalDedupStream, Optional.empty(), "");
@@ -411,9 +413,7 @@ public void allTypes() throws Exception {
411413
dumpRawTableRecords(streamId),
412414
"sqlgenerator/alltypes_expectedrecords_final.jsonl",
413415
dumpFinalTableRecords(streamId, ""));
414-
initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
415-
assertEquals(1, initialStates.size());
416-
initialState = initialStates.getFirst();
416+
initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
417417
assertFalse(initialState.isFinalTableEmpty(), "Final table should not be empty after T+D");
418418
}
419419

@@ -428,19 +428,14 @@ public void allTypesUnsafe() throws Exception {
428428
streamId,
429429
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_unsafe_inputrecords.jsonl"));
430430

431-
List<DestinationInitialState> initialStates =
432-
destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
433-
assertEquals(1, initialStates.size());
434-
DestinationInitialState initialState = initialStates.getFirst();
431+
DestinationInitialState initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
435432
assertTrue(initialState.isFinalTableEmpty(), "Final table should be empty before T+D");
436433

437434
// Instead of using the full T+D transaction, explicitly run with useSafeCasting=false.
438435
final Sql unsafeSql = generator.updateTable(incrementalDedupStream, "", Optional.empty(), false);
439436
destinationHandler.execute(unsafeSql);
440437

441-
initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
442-
assertEquals(1, initialStates.size());
443-
initialState = initialStates.getFirst();
438+
initialState = getOnly(destinationHandler.gatherInitialState(List.of(incrementalDedupStream)));
444439
assertFalse(initialState.isFinalTableEmpty(), "Final table should not be empty after T+D");
445440
}
446441

0 commit comments

Comments
 (0)