Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Destination Postgres: Fix casing for raw table in T+D query" #34645

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public abstract class JdbcSqlGenerator implements SqlGenerator<TableDefinition>
private static final String TYPING_CTE_ALIAS = "intermediate_data";
private static final String NUMBERED_ROWS_CTE_ALIAS = "numbered_rows";

protected final NamingConventionTransformer namingTransformer;
private final NamingConventionTransformer namingTransformer;
protected final ColumnId cdcDeletedAtColumn;

public JdbcSqlGenerator(final NamingConventionTransformer namingTransformer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.16.2'
cdkVersionRequired = '0.13.1'
features = [
'db-sources', // required for tests
'db-destinations'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.1
dockerImageTag: 0.6.2
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.16.2'
cdkVersionRequired = '0.14.0'
features = [
'db-sources', // required for tests
'db-destinations',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.1
dockerImageTag: 0.6.2
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.ArrayList;
Expand Down Expand Up @@ -65,23 +64,6 @@ public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer)
super(namingTransformer);
}

@Override
public StreamId buildStreamId(final String namespace, final String name, final String rawNamespaceOverride) {
// There is a mismatch between convention used in create table query in SqlOperations vs this.
// For postgres specifically, when a create table is issued without a quoted identifier, it will be
// converted to lowercase.
// To keep it consistent when querying raw table in T+D query, convert it to lowercase.
// TODO: This logic should be unified across Raw and final table operations in a single class
// operating on a StreamId.
return new StreamId(
namingTransformer.getNamespace(namespace),
namingTransformer.convertStreamName(name),
namingTransformer.getNamespace(rawNamespaceOverride).toLowerCase(),
namingTransformer.convertStreamName(StreamId.concatenateRawTableName(namespace, name)).toLowerCase(),
namespace,
name);
}

@Override
protected DataType<?> getStructType() {
return JSONB_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,9 @@
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.List;
import javax.sql.DataSource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest {

Expand Down Expand Up @@ -80,31 +72,4 @@ protected JdbcCompatibleSourceOperations<?> getSourceOperations() {
return new PostgresSqlGeneratorIntegrationTest.PostgresSourceOperations();
}

@Test
public void testMixedCasedSchema() throws Exception {
streamName = "MixedCaseSchema" + streamName;
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));

// First sync
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");

runSync(catalog, messages1);

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());
}

@Override
protected List<JsonNode> dumpRawTableRecords(String streamNamespace, String streamName) throws Exception {
return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase());
}

}
3 changes: 2 additions & 1 deletion docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 0.6.2 | 2024-01-29 | [34645](https://github.com/airbytehq/airbyte/pull/34645) | Revert Previous Release |
| 0.6.1 | 2024-01-29 | [34630](https://github.com/airbytehq/airbyte/pull/34630) | CDK Upgrade; Use lowercase raw table in T+D queries. |
| 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec |
| 0.5.5 | 2024-01-18 | [34236](https://github.com/airbytehq/airbyte/pull/34236) | Upgrade CDK to 0.13.1; Add indexes in raw table for query optimization |
Expand All @@ -194,4 +195,4 @@ Now that you have set up the Postgres destination connector, check out the follo
| 0.3.13 | 2021-12-01 | [\#8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
| 0.3.12 | 2021-11-08 | [\#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
| 0.3.11 | 2021-09-07 | [\#5743](https://github.com/airbytehq/airbyte/pull/5743) | Add SSH Tunnel support |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |