Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination postgres: upgrade cdk #35528

Merged
merged 7 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.2
dockerImageTag: 2.0.3
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
plugins {
id 'airbyte-java-connector'
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.2'
cdkVersionRequired = '0.23.11'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.2
dockerImageTag: 2.0.3
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
import java.io.UnsupportedEncodingException;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresState;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -99,12 +100,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {

String encodedDatabase = config.get(JdbcUtils.DATABASE_KEY).asText();
if (encodedDatabase != null) {
try {
encodedDatabase = URLEncoder.encode(encodedDatabase, "UTF-8");
} catch (final UnsupportedEncodingException e) {
// Should never happen
e.printStackTrace();
}
encodedDatabase = URLEncoder.encode(encodedDatabase, StandardCharsets.UTF_8);
}
final String jdbcUrl = String.format("jdbc:postgresql://%s:%s/%s?",
config.get(JdbcUtils.HOST_KEY).asText(),
Expand Down Expand Up @@ -133,8 +129,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
return new PostgresDestinationHandler(databaseName, database);
protected JdbcDestinationHandler<PostgresState> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
return new PostgresDestinationHandler(databaseName, database, rawTableSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.postgres.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
Expand All @@ -12,11 +13,12 @@
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import org.jooq.SQLDialect;

public class PostgresDestinationHandler extends JdbcDestinationHandler {
public class PostgresDestinationHandler extends JdbcDestinationHandler<PostgresState> {

public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase, String rawTableSchema) {
super(databaseName, jdbcDatabase, rawTableSchema, SQLDialect.POSTGRES);
}

@Override
Expand All @@ -33,6 +35,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
};
}

@Override
protected PostgresState toDestinationState(JsonNode json) {
return new PostgresState(
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.postgres.typing_deduping

import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState

data class PostgresState(val needsSoftReset: Boolean) : MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}

override fun <T : MinimumDestinationState> withSoftReset(needsSoftReset: Boolean): T {
return copy(needsSoftReset = needsSoftReset) as T
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
Expand All @@ -31,7 +31,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class PostgresSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest {
public class PostgresSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest<PostgresState> {

private static PostgresTestDatabase testContainer;
private static String databaseName;
Expand Down Expand Up @@ -75,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected DestinationHandler getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database);
protected DestinationHandler<PostgresState> getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database, namespace);
}

@Override
Expand All @@ -95,11 +95,11 @@ public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);

List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
List<DestinationInitialStatus<PostgresState>> initialStatuses = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStatuses.size());
final DestinationInitialStatus<PostgresState> initialStatus = initialStatuses.getFirst();
assertTrue(initialStatus.isFinalTablePresent());
assertFalse(initialStatus.isSchemaMismatch());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ protected PostgreSQLContainer<?> createNewContainer(DockerImageName imageName) {
/**
* Apply the postgresql.conf file that we've packaged as a resource.
*/
public void withConf(PostgreSQLContainer<?> container) {
public static void withConf(PostgreSQLContainer<?> container) {
container
.withCopyFileToContainer(
MountableFile.forClasspathResource("postgresql.conf"),
Expand All @@ -37,21 +37,14 @@ public void withConf(PostgreSQLContainer<?> container) {
/**
* Create a new network and bind it to the container.
*/
public void withNetwork(PostgreSQLContainer<?> container) {
public static void withNetwork(PostgreSQLContainer<?> container) {
container.withNetwork(Network.newNetwork());
}

/**
* Configure postgres with wal_level=logical.
*/
public void withWalLevelLogical(PostgreSQLContainer<?> container) {
container.withCommand("postgres -c wal_level=logical");
}

/**
* Generate SSL certificates and tell postgres to enable SSL and use them.
*/
public void withCert(PostgreSQLContainer<?> container) {
public static void withCert(PostgreSQLContainer<?> container) {
container.start();
String[] commands = {
"psql -U test -c \"CREATE USER postgres WITH PASSWORD 'postgres';\"",
Expand Down Expand Up @@ -97,7 +90,7 @@ public void withCert(PostgreSQLContainer<?> container) {
/**
* Tell postgres to enable SSL.
*/
public void withSSL(PostgreSQLContainer<?> container) {
public static void withSSL(PostgreSQLContainer<?> container) {
container.withCommand("postgres " +
"-c ssl=on " +
"-c ssl_cert_file=/var/lib/postgresql/server.crt " +
Expand All @@ -107,7 +100,7 @@ public void withSSL(PostgreSQLContainer<?> container) {
/**
* Configure postgres with client_encoding=sql_ascii.
*/
public void withASCII(PostgreSQLContainer<?> container) {
public static void withASCII(PostgreSQLContainer<?> container) {
container.withCommand("postgres -c client_encoding=sql_ascii");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.testutils.ContainerFactory.NamedContainerModifier;
import io.airbyte.cdk.testutils.TestDatabase;
import io.airbyte.commons.json.Jsons;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
import org.testcontainers.containers.PostgreSQLContainer;
Expand Down Expand Up @@ -39,27 +41,30 @@ private BaseImage(String reference) {

}

public static enum ContainerModifier {
public enum ContainerModifier implements NamedContainerModifier<PostgreSQLContainer<?>> {

ASCII("withASCII"),
CONF("withConf"),
NETWORK("withNetwork"),
SSL("withSSL"),
WAL_LEVEL_LOGICAL("withWalLevelLogical"),
CERT("withCert"),
ASCII(PostgresContainerFactory::withASCII),
CONF(PostgresContainerFactory::withConf),
NETWORK(PostgresContainerFactory::withNetwork),
SSL(PostgresContainerFactory::withSSL),
CERT(PostgresContainerFactory::withCert),
;

private String methodName;
private Consumer<PostgreSQLContainer<?>> modifer;

private ContainerModifier(String methodName) {
this.methodName = methodName;
private ContainerModifier(final Consumer<PostgreSQLContainer<?>> modifer) {
this.modifer = modifer;
}

@Override
public Consumer<PostgreSQLContainer<?>> modifier() {
return modifer;
}

}

static public PostgresTestDatabase in(BaseImage baseImage, ContainerModifier... modifiers) {
String[] methodNames = Stream.of(modifiers).map(im -> im.methodName).toList().toArray(new String[0]);
final var container = new PostgresContainerFactory().shared(baseImage.reference, methodNames);
final var container = new PostgresContainerFactory().shared(baseImage.reference, modifiers);
return new PostgresTestDatabase(container).initialized();
}

Expand Down
Loading
Loading